(二)丶RabbitMQ的六大核心

news2025/1/23 17:26:53

一丶什么是MQ

        Message Queue(消息队列)简称MQ,是一种应用程序对应用程序的消息通信机制。在MQ中,消息以队列形式存储,以便于异步传输,在MQ中,发布者(生产者)将消息放入队列,而消费者从队列中读取并处理这些消息,这种设计允许生产者和消费者之间解耦,提高系统的响应速度和吞吐量,MQ常用于解耦系统之间的依赖关系,提高系统的稳定性和可扩展性,MQ还支持消峰,即以稳定的系统资源应对突发的流量冲剂,然而使用MQ也可能带来一些挑战,如:系统可用性降低、系统复杂度提高、以及消息一致性问题等;

二丶常见MQ有哪些

        目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、Kafka、ZeroMQ、MetaMQ等,也有直接使用Redis充当消息队列的案列,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ的产品特征等,综合考虑;

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是Erlang语言编写,而集群和故障转移是构建在开放电信平台框架上的。所有的主要变成语言均有与代理接口通讯的客户端库。

三丶RabbitMQ六种消息模式

        1.Simple Work Queue(简单工作队列):常见的一对一模式,一条消息由一个消费者进行消费。如果有多个消费者,默认是使用轮询的方式将消息分配消费者

        2.Work Queues(工作队列模式):也叫公屏队列,能者多劳的消息队列模型。队列必须收到来自消费者的手动ACK(消息确认机制)才可以继续往消费者发送消息。

        3.Publish/Subscribe(发布订阅模式):一条消息被多和消费者消费。

        4.Ruoting(路由模式):有选择的接收消息。

        5.Topics(主题模式):通过一定的规则来选择性的接收消息

        6.RPC模式:发布者发布消息,并且通过RPC方式等待结果。

(1)Simple Work Queue(简单工作队列)

        消息生产后将消息放入队列,消息的消费者(consumer)监听消息队列嘛,如果队列中有消息就消费掉,消息被消费后自动从消息队列中删除。(也可能出现异常)

/**
 * @Description:获取RabbitMQ连接
 * @Author: xy丶
 */
public class RabbitMQConnection {
    public final static String RABBITMQ_SERVER_HOST = "192.168.0.122";
    public final static int RABBITMQ_SERVER_PORT = 5672;
    public final static String VIRTUAL_HOST = "/XY_HOST";

    public static Connection getConnection() throws IOException, TimeoutException {
        //1、创建连接
        ConnectionFactory factory = new ConnectionFactory();
        //2、设置主机名
        factory.setHost(RABBITMQ_SERVER_HOST);
        //3、设置通讯端口,默认是5672,不专门设置也可以
        factory.setPort(RABBITMQ_SERVER_PORT);
        //4、设置账号和密码
        factory.setUsername("admin");
        factory.setPassword("admin");
        //4、设置Virtual Host
        factory.setVirtualHost(VIRTUAL_HOST);
        //5、创建连接
        return factory.newConnection();
    }

}

       消费者

/**
 * @Description:简单工作队列模式消费者
 * @Author: xy丶
 */
@Slf4j
public class Consumer {
    private final static String SIMPLE_WORK_QUEUE= "simple_work_queue";

    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        //获取连接
        Connection connection = RabbitMQConnection.getConnection();
        //获取通道
        Channel channel = connection.createChannel();

        channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(SIMPLE_WORK_QUEUE, true, consumer);
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        log.info("Consumer reception "+message);
    }
}

生产者

/**
 * @Description:简单工作队列模式生产者
 * @Author: xy丶
 */
@Slf4j
public class Producer {
    private final static String SIMPLE_WORK_QUEUE = "simple_work_queue";

    /**
     * 简单工作队列模式
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        // 声明队列 String var1, 是否持久化
        // boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者
        // boolean var3, 是否自动删除
        // boolean var4, 消费完删除
        // Map<String, Object> var5 其他属性
        channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);
        // 消息内容 String var1, 是否持久化
        // boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者
        // boolean var3, 是否自动删除
        // boolean var4, 消费完删除
        // Map<String, Object> var5 其他属性
        String message = "Hello Word!!!";
        channel.basicPublish("", SIMPLE_WORK_QUEUE,null,message.getBytes());
        log.info("Producer send "+message);
        //最后关闭通关和连接
        channel.close();
        connection.close();
    }
       (2).Work Queues(工作队列模式) 创建生产者两个消费者看看效果

        生产者

package com.puwang.MQ.workQueue;

import com.puwang.MQ.config.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Description:工作队列模式生产者
 * @Author: xy丶
 */
@Slf4j
public class Producer {

    private final static String QUEUE_WORK = "QUEUE_WORK";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_WORK, false, false, false, null);
        for(int i = 0; i < 20; i++){
            String message = "娃哈哈" + i;
            channel.basicPublish("", QUEUE_WORK, null, message.getBytes());
            System.out.println("send=============="+message);
            Thread.sleep(i*10);
        }

        channel.close();
        connection.close();

    }
}



@Slf4j
public class WorkQueueConsumer1 {
    private final static  String QUEUE_WORK = "QUEUE_WORK";

    /**
     * 结果:
     *
     * 1、一条消息只会被一个消费者接收;
     *
     * 2、rabbit采用轮询的方式将消息是平均发送给消费者的;
     *
     * 3、消费者在处理完某条消息后,才会收到下一条消息。
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_WORK, false,false, false,null);
        //同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        //关于手工确认 待之后有时间研究下
        channel.basicConsume(QUEUE_WORK, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            log.info("[消费者1] Received1 '"+message+"'");
            Thread.sleep(10);
            //返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

@Slf4j
public class WorkQueueConsumer2 {
    private final static  String QUEUE_WORK = "QUEUE_WORK";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_WORK, false,false, false,null);
        //同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        //关于手工确认 待之后有时间研究下
        channel.basicConsume(QUEUE_WORK, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            log.info("[消费者2] Received1 '"+message+"'");
            Thread.sleep(10);
            //返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}




       (3).Publish/Subscribe(发布订阅模式)

        生产者

/**
 * 订阅模式 生产者
 * 订阅模式:一个生产者发送的消息会被多个消费者获取。
 * 消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
 * 相关场景:邮件群发,群聊天,广播(广告)
 * @Description:发布订阅模式生产者
 * @Author: xy丶
 */
@Slf4j
public class Producer {
    private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";

    /**
     * 交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
     * 相关场景:邮件群发,群聊天,广播(广告)
     * @param args
     */
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = RabbitMQConnection.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
         消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKey
        channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
        //发送消息
        for (int i = 0; i < 10; i++) {
            String message = "哇哈哈哈!!!"+i;
            log.info("send message:" + message);
            //发送消息
            channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE, "", null, message.getBytes("utf-8"));
            Thread.sleep(100 * i);
        }
        channel.close();
        connection.close();
    }
}

消费者

/**
 * @Description:发布订阅模式消费者
 * @Author: xy丶
 */
@Slf4j
public class PublishSubscribeConsumer1 {
    //交换机名称
    private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";

    //队列名称
    private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = RabbitMQConnection.getConnection();
        //从连接中获取一个通道
        final Channel channel = connection.createChannel();
        //声明交换机(分发:发布/订阅模式)
        channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
        //声明队列
        channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
        //将队列绑定到交换机
        channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
        //保证一次只分发一个
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                log.info("[PublishSubscribeConsumer1] Receive message:" + message);
                try {
                    //消费者休息2s处理业务
                    Thread.sleep(1000);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    System.out.println("[1] done");
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //设置手动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
    }
}

/**
 * @Description:发布订阅模式消费者
 * @Author: xy丶
 */
@Slf4j
public class PublishSubscribeConsumer1 {
    //交换机名称
    private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";

    //队列名称
    private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = RabbitMQConnection.getConnection();
        //从连接中获取一个通道
        final Channel channel = connection.createChannel();
        //声明交换机(分发:发布/订阅模式)
        channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
        //声明队列
        channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
        //将队列绑定到交换机
        channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
        //保证一次只分发一个
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                log.info("[PublishSubscribeConsumer1] Receive message:" + message);
                try {
                    //消费者休息2s处理业务
                    Thread.sleep(1000);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    System.out.println("[1] done");
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //设置手动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
    }
}



/**
 * @Description:发布订阅模式消费者
 * @Author: xy丶
 */
@Slf4j
public class PublishSubscribeConsumer2 {
    //交换机名称
    private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";

    //队列名称
    private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = RabbitMQConnection.getConnection();
        //从连接中获取一个通道
        final Channel channel = connection.createChannel();
        //声明交换机(分发:发布/订阅模式)
        channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");
        //声明队列
        channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);
        //将队列绑定到交换机
        channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");
        //保证一次只分发一个
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                log.info("[PublishSubscribeConsumer2] Receive message:" + message);
                try {
                    //消费者休息2s处理业务
                    Thread.sleep(1000);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    System.out.println("[1] done");
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //设置手动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);
    }
}






       (4).Ruoting(路由模式)

消费者

/**
 * 1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息
 * 2)根据业务功能定义路由字符串
 * 3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;
 * 客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误
 * @Description:发布订阅模式生产者
 * @Author: xy丶
 */
@Slf4j
public class Producer {
    //路由交换机名称
    static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
    //队列名称1 发送
    static final String QUEUE_SEND = "queue_send";
    //队列名称2  接收
    static final String QUEUE_RECEIVE = "queue_receive";

    public static void main(String[] args) throws Exception {
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        /**
         * 声明交换机
         * 参数1:交换机名称
         * 参数2:交换机类型,fanout,toppic,direct,headers
         */
        channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE, "direct");
        /**
         * 声明(创建)队列
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(QUEUE_SEND,true,false,false,null);
        channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);
        /**
         * 队列绑定交换机
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         */
        channel.queueBind(QUEUE_SEND,ROUTING_DIRECT_EXCHANGE,"send");
        channel.queueBind(QUEUE_RECEIVE,ROUTING_DIRECT_EXCHANGE,"receive");

        //发送消息
        String message = "路由模式:routing key 为 send";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"send",null,message.getBytes());
        log.info("已发送消息:"+message);

        //发送消息
        message = "路由模式:routing key 为 receive";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"receive",null,message.getBytes());
        log.info("已发送消息:"+message);

        //关闭资源
        channel.close();
        connection.close();
    }
}

消费者


/**
 *路由消费者
 */
@Slf4j
public class RoutingConsumer1 {

    //路由交换机名称
    static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
    //队列名称1 发送
    static final String QUEUE_SEND = "queue_send";

    public static void main(String[] args) throws Exception {
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建通道(频道)
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");
        /**
         * 声明(创建)队列
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(QUEUE_SEND,true,false,false,null);

        //队列绑定交换机
        channel.queueBind(QUEUE_SEND, ROUTING_DIRECT_EXCHANGE,"send");

        //创建消费这,并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
             *          消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                log.info("路由key为:" + envelope.getRoutingKey());
                //交换机
                log.info("交换机为:" + envelope.getExchange());
                //消息id
                log.info("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                log.info("消费者1-接收到的消息为:" + new String(body, "utf8"));
            }
        };

        /**
         * 监听消息
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(QUEUE_SEND, true, consumer);
    }


/**
 *路由消费者
 */
@Slf4j
public class RoutingConsumer2 {

    //路由交换机名称
    static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";
    //队列名称1 发送
    static final String QUEUE_RECEIVE = "queue_receive";

    public static void main(String[] args) throws Exception {
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建通道(频道)
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");
        /**
         * 声明(创建)队列
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);

        //队列绑定交换机
        channel.queueBind(QUEUE_RECEIVE, ROUTING_DIRECT_EXCHANGE,"receive");

        //创建消费这,并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 消息者标签,在channel.basicConsume时候可以指定
             * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,
             *          消息和重传标志(收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                log.info("路由key为:" + envelope.getRoutingKey());
                //交换机
                log.info("交换机为:" + envelope.getExchange());
                //消息id
                log.info("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                log.info("消费者2-接收到的消息为:" + new String(body, "utf8"));
            }
        };

        /**
         * 监听消息
         * 参数1:队列名称
         * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
         * 参数3:消息接收到后回调
         */
        channel.basicConsume(QUEUE_RECEIVE, true, consumer);
    }
}
       (5).Topics(主题模式/路由模式的一种)

生产者

/**
 * 跟 routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由routingKey,类似于SQL中 = 和 like 的关系
 * 消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;
 *
 *要求
 * Topic 模式消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 “.” 或者 “#” 分隔开。这些单词可以是任意单词,这个单词列表最多不能超过 255 个字节。
 * 分隔符
 * “*(星号)”:可以代替一个单词
 * “#(井号)”:可以替代零个或多个单词
 * @Description:主题模式
 * @Author: xy丶
 */
@Slf4j
public class TopicProducer {

    public static final String TOPIC_EXCHANGE = "topic_exchange";
    public static final String TOPIC_QUEUE_ONE = "topic_queue_one";
    public static final String TOPIC_QUEUE_TWO = "topic_queue_two";

    public static void main(String[] args) throws Exception {
        //声明用作全局变量的队列变量和交换价变量

        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(TOPIC_QUEUE_ONE,true,false,false,null);
        channel.queueDeclare(TOPIC_QUEUE_TWO,true,false,false,null);
        //声明交换机
        channel.exchangeDeclare(TOPIC_EXCHANGE, "topic",true);
        //绑定队列
        channel.queueBind(TOPIC_QUEUE_ONE,TOPIC_EXCHANGE,"*.orange.*");
        channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"*.*.rabbit");
        channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"lazy.#");
        //发生消息
        for (int i = 0; i <10 ; i++) {
            String msg="goodnight!My love world===>"+i;
            channel.basicPublish(TOPIC_EXCHANGE,"ag.we.rabbit",null,msg.getBytes());
        }
    }
}

消费者

@Slf4j
public class TopicCustomer1 {

    public static final String TOPIC_QUEUE_ONE="topic_queue_one";

    public static void main(String[] args) throws Exception{
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                log.info("TopicCustomer1=====>:"+new String(body));
            }
        };
        channel.basicConsume(TOPIC_QUEUE_ONE,true,consumer);
    }
}


@Slf4j
public class TopicCustomer2 {
    public static final String TOPIC_QUEUE_TWO="topic_queue_two";

    public static void main(String[] args) throws Exception{
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                log.info("TopicCustomer22=====>:"+new String(body));
            }
        };
        channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);
    }
}


@Slf4j
public class TopicCustomer3 {
    public static final String TOPIC_QUEUE_TWO = "topic_queue_two";

    public static void main(String[] args) throws Exception{
        //创建连接
        Connection connection = RabbitMQConnection.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                log.info("TopicCustomer2=====>:"+new String(body));
            }
        };
        channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);
    }
}



       (6).RPC模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1520954.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于信号分解的几种一维时间序列降噪方法(MATLAB R2021B)

自适应信号分解算法是一种适合对非平稳信号分析的方法&#xff0c;它将一个信号分解为多个模态叠加的形式&#xff0c;进而可以准确反应信号中所包含的频率分量以及瞬时频率随时间变化的规律。自适应信号分解算法与众多“刚性”方法(如傅里叶变换&#xff0c;小波变换)不同&…

最后十几天!未备案小程序将会被清退

微信官方通知 2023年8月9日&#xff0c;微信公众平台发布了“关于开展微信小程序备案的通知”&#xff1a; 去年就已经在逐步推进备案了&#xff0c;新注册小程序必须备案才可以上架。若微信小程序已上架&#xff0c;须于2024年3月31日前完成备案&#xff0c;逾期未完成备案&a…

苹果大模型MM1强势来袭!300亿参数所向披靡

功能展示 左图问&#xff1a;图片中有多少个苹果&#xff1f; 苹果大模型答&#xff1a;7个 左图问&#xff1a;这些州中哪一个是最北的&#xff1f; 回答&#xff1a;Delaware 追问&#xff1a;其他的州是什么&#xff1f; 回答&#xff1a;Arizona, Oklahoma, North Caro…

Ant Design Pro complete版本的下载及运行

前言 complete 版本提供了很多基础、美观的页面和组件&#xff0c;对于前端不太熟练的小白十分友好&#xff0c;可以直接套用或者修改提供的代码完成自己的页面开发&#xff0c;简直不要太爽。故记录一些下载的步骤。 环境 E:\code>npm -v 9.8.1E:\code>node -v v18.1…

[HTML]Web前端开发技术24(HTML5、CSS3、JavaScript )JavaScript基础JavaScript,Netscape,事件处理代码,外部JS——喵喵画网页

希望你开心&#xff0c;希望你健康&#xff0c;希望你幸福&#xff0c;希望你点赞&#xff01; 最后的最后&#xff0c;关注喵&#xff0c;关注喵&#xff0c;关注喵&#xff0c;佬佬会看到更多有趣的博客哦&#xff01;&#xff01;&#xff01; 喵喵喵&#xff0c;你对我真的…

Java学习笔记(12)

包 导包 Final 不能被改变的&#xff0c;修饰方法 类 变量 方法不能被重写 类不能被继承 变量&#xff0c;赋值一次&#xff0c;变成常量&#xff0c;不能再被赋值 final修饰引用数据类型&#xff0c;地址值不能变&#xff0c;里面的内容可以变 字符串是不可变的 源码中使…

ubuntu(20.04)-安装JAVA环境-IDEA

1.下载IDEA 2.解压文件 sudo tar -zxvf idealC-2022.2.3.tar.gz -C /opt 3.添加环境变量&#xff1a; .vim ~/.bashrc export IDEA_HOME/opt/ideaIC-2022.2.3/ export PATH${IDEA_HOME}/bin:$PATH source ~/.bashrc 4.启动&#xff1a; cd /opt/ideaIC-2…

安装jupyter报错:404 GET /static/notebook/4131.bundle.js

1、报错安装过程 我直接是pip install jupyter 进行的安装&#xff0c;如下&#xff0c;安装的版本是7.1.2 2、报错结果 运行jupyternotebook后报错&#xff1a;404 GET /static/notebook/4131.bundle.js (3bea7012d1534d70a935c3c193d9308d127.0.0.1) 5.70ms refererht…

cf(163)

D. Tandem Repeats? 找最长串联字串的长度 #include<iostream> #include<algorithm> #include<cstring> #include<queue> #include<vector> #include<map> using namespace std; typedef pair<int,int>PII; typedef long long ll…

ThingsBoard Edge 安装部署

文章目录 一、概述1.官方文档2.部署说明3.安装准备3.1. 克隆服务器3.2.安装 Docker3.3.安装 Java 113.4.安装 PostgreSQL3.5.下载安装包 二、安装部署1.创建 Edge 实例2.创建数据库3.Edge 服务安装3.1.安装服务3.2.配置 Edge3.3.运行安装脚本3.4.重新启动服务 4.访问 Edge5.故障…

基于ssm+layui的图书管理系统

基于ssmlayui的图书管理系统 账户类型分为&#xff1a;管理员&#xff0c;用户管理员私有功能用户私有功能公共功能技术栈功能实现图 视频演示 账户类型分为&#xff1a;管理员&#xff0c;用户 图书管理系统主要登录账户类型为管理员账户与用户账户 管理员私有功能 账户管理…

算法的渐进时间复杂度

T(n) = O(F(n)) T(n):Time 渐进时间复杂度 O:正比例关系 F(n):代码执行次数 只要代码执行的次数越来越多 所耗费的时间也就越来越高 常见的5种: O(n^2) O(n logn) O(n) O(logn) O(1):不管重复多少次1次也是这个时间,10次也是这个时间。 时间复杂度排序:由小到…

关于如何在BIOS中设置引导顺序,看这篇文章就差不多了

前言 更改计算机上“可引导”设备(如USB端口、软盘驱动器或光盘驱动器中的硬盘驱动器或可引导介质)的引导顺序非常容易。 为什么要更改引导顺序 有几个场景需要更改引导顺序,例如启动一些数据销毁工具和可引导防病毒程序,以及安装操作系统。 BIOS设置实用程序用于更改引…

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:ScrollBar)

滚动条组件ScrollBar&#xff0c;用于配合可滚动组件使用&#xff0c;如List、Grid、Scroll。 说明&#xff1a; 该组件从API Version 8开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 可以包含单个子组件。 接口 ScrollBar(val…

C++ 入门篇

目录 1、了解C 2、C关键字 2、命名空间 2.1 命名空间的定义 2.2 命名空间的使用 3. C输入与输出 4.缺省参数 4.1 缺省参数的概念 4.2 缺省参数的分类 5. 函数重载 5.1 函数重载的概念 5.2 C中支持函数重载的原理--名字修饰 6. 引用 6.1 引用概念 6.2 引用…

【Miniconda】Linux系统中 .condarc 配置文件的位置一般在哪里

【Miniconda】Linux系统中 .condarc 配置文件的位置一般在哪里 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到…

Elasticsearch 索引库操作 文档操作

索引库就类似数据库表&#xff0c;mapping映射就类似表的结构。要向es中存储数据&#xff0c;必须先创建“库”和“表”。 mapping映射属性 mapping是对索引库中文档的约束&#xff0c;常见的mapping属性包括&#xff1a; type&#xff1a; 字段数据类型&#xff0c;常见的简…

CSS3技巧38:3D 翻转数字效果

博主其它CSS3 3D的文章&#xff1a; CSS3干货4&#xff1a;CSS中3D运用_css 3d-CSDN博客 CSS3干货5&#xff1a;CSS中3D运用-2_中3d-2-CSDN博客 CSS3干货6&#xff1a;CSS中3D运用-3_css3d 使用-CSDN博客 最近工作上烦心的事情太多&#xff0c;只有周末才能让我冷静一下 cod…

黑群晖: 未在 DS918+ 中检测到硬盘 之 解决方案

黑群晖&#xff1a; 未在 DS918 中检测到硬盘 之 解决方案 操作如下&#xff1a; 进入BIOS&#xff0c;将sata operation 设置为 AHCI 即可

ARMv8架构特殊寄存器介绍-0

一、zero 寄存器 零寄存器用作源寄存器时读取零&#xff0c;用作目标寄存器时丢弃结果。您可以在大多数指令中使用零寄存器&#xff0c;但不是所有指令。二、sp寄存器 在ARMv8架构中&#xff0c;要使用的堆栈指针的选择在某种程度上与Exception级别。默认情况下&#xff0c;异…