RabbitMQ消息队列——快速入门

news2025/1/12 18:10:03

目录

1、MQ介绍

1.1、什么是MQ?

1.2、MQ的能够解决的问题

1.2.1、削峰填谷

1.2.3、异步处理

1.3、MQ的选择

1.3.1、Kafka

1.3.2、ActiveMQ

1.3.3、RocketMQ

1.3.4、RabbitMQ

2、RabbitMQ的介绍

2.1、RabbitMQ的概述

2.2、AMQP

2.3、JMS

2.4、RabbitMQ模式

3、RabbitMQ的安装

3.1、安装Erlang

3.2、RabbitMQ安装

3.3、创建用户

3.4、创建虚拟主机Virtual Hosts

4、RabbitMQ工作模式

4.1、简单模式

4.1.1、生产者

4.1.2、消费者

4.2、工作队列模式(Work queues)

4.2.1、生产者 

4.2.2、消费者1

4.2.3、消费者2

​编辑4.3、发布/订阅模式(Publish/Subscribe)

4.3.1、生产者

4.3.2、消费者1

4.3.3、消费者2 

4.4、路由模式(Routing)

4.4.1、生产者

4.4.2、消费者1

4.4.3、消费者2

4.5、通配符模式(Topic)

4.5.1、生产者

4.5.1、消费者1

4.5.2、消费者2

4.6、发布确认模式(Publisher Confirms)

4.6.1、单次确认

4.6.2、批量确认

4.6.3、异步确认

4.6.4、速度对比

4.7、RabbitMQ模式总结


1、MQ介绍

1.1、什么是MQ?

从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。

1.2、MQ的能够解决的问题

1.2.1、削峰填谷

以12306为例,假设平时可能买票的人不多,所以订单系统的QPS( 每秒查询率 )也不是很高,每秒也就处理1000个请求,但是一到节假日、春运期间可能抢票的人就非常多,并发量远远大于平时,这个时候,订单系统明显扛不住了。怎么办呢,当然我们可以设计弹性伸缩的集群,进行机器扩容,保证高可用。但是我们依然可以采用MQ来解决这个问题。

MQ的吞吐能力还是还强大的,所以我们可以设计高可用的MQ,让所有的请求都到MQ,缓存起来。这样一来高峰期的流量和数据都将积压在MQ中,流量高峰就被削弱了(削峰),然后我们的订单系统就避免了高并发的请求,它可以慢慢的从MQ中拉取自己能力范围内的消息就行处理。这样一来,高峰期积压的消息也终将被消费完,可以叫做填谷。

1.2.2、应用解耦

产品经理提需求,好多人关注了我们12306微信客户端,我们需要买票成功后在通知微信小程序。那么我们又需要修改订单系统的代码。一次还好,如果隔一段时间发生一件这样的事,那谁能忍受?

某一天,短信系统挂了,然后客户成功买到一张票,然后呢是短信也没收到,邮件也没收到,库存也没扣,这还得了。你短信系统坏了,我邮件系统好好的,凭什么影响我,让客户收不到邮件,这就不合理。 所以呢,还是各个系统之间的耦合太高了,我们应该解耦。不是有人说互联网的任何问题都可以通过一个中间件来解决吗,那么我们看MQ如何帮我们解决这件棘手的事情。

那么我们发现其实短信系统、邮件系统等都只依赖订单系统产生的一条数据那就是订单,因此我们在订单系统产生数据后,将订单这条数据发送给MQ,就返回成功,然后让短信、邮件等系统都订阅MQ,一旦发现MQ有消息,他们主动拉取消息,然后解析,进行业务处理。这样一来,就算你短信系统挂了,丝毫不会影响其他系统,而且如果后来想加一个新的系统,你也不用改订单系统的代码了,你只要订阅我们的MQ提供的消息就行了

1.2.3、异步处理

还以上面12306为例,假设我们不用MQ,那么我们的代码必然耦合在一起,下单成功后,依次要通过RPC远程调用这几个系统,然后同步等到他们的响应才能返回给用户是否成功的结果。假设每个系统耗时200ms,那么就得花费600ms

但是其实有时候我们发现,下单是个核心业务,可能压力本来就大,客户也着急知道下单是否成功,但是短信邮件等通知,可能大多数人根本不急或者压根不关心,那么我们为什么要让这些细枝末节的业务影响核心业务的效率呢,是不是有点舍本逐末。所以这个逻辑我们可以设计成异步的。我们可以当下单成功后,只需要将订单消息发给MQ,然后立即将结果返回通知客户。这才是正确的打开姿势。这样一来,我订单系统只需要告诉你MQ,我下单成功了,其他模块收到消息后,该发短信的发短信,发邮件的发邮件。因为本来MQ的性能就很好,所以这个效率一下就提升了。

1.3、MQ的选择

1.3.1、Kafka

Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。

1.3.2、ActiveMQ

优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性,较低的概率丢失数据。

缺点:维护越来越少,高吞吐量场景较少使用。

1.3.3、RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

1.3.4、RabbitMQ

结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分 方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ,本章就以RabbitMQ为例。

2、RabbitMQ的介绍

2.1、RabbitMQ的概述

RabbitMQ是由erlang语言开发,基于AMQP协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ官方地址:

Messaging that just works — RabbitMQhttp://www.rabbitmq.com/

RabbitMQ的架构图 

2.2、AMQP

AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 AMQP是协议,类比HTTP。

2.3、JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JSM是API接口规范,类比JDBC。

2.4、RabbitMQ模式

RabbitMQ提供了7种模式:

1、简单模式

2、work-queue工作队列模式

3、Publish/Subscribe发布与订阅模式

4、Routing路由模式

5、Topics主题模式

6、RPC远程调用模式(远程调用,不太算MQ,不作介绍)

7、Publisher Confirms发布确认

官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

3、RabbitMQ的安装

官方windows安装文档,Installing on Windows — RabbitMQ

安装注意事项: 1、 推荐使用默认的安装路径 2、 系统用户名必须是英文

百度网盘资源

链接:https://pan.baidu.com/s/1C87Piy7co6BWf9v8N-4WrQ 
提取码:c17d

3.1、安装Erlang

RabbitMQ是由erlang语言开发,所以我在安装 RabbitMQ 一定要先安装Erlang环境,注意版本匹配

  • RabbitMQ Erlang Version Requirements — RabbitMQ 查看版本选择

  • https://erlang.org/download/otp_win64_25.0.exe erlang版本下载

  • 右击 otp_win64_25.0.exe 以管理员身份运行 进行安装

  • 安装Erlang只需要下一步下一步即可

3.2、RabbitMQ安装

  • Release RabbitMQ 3.10.10 · rabbitmq/rabbitmq-server · GitHub rabbitMQ版本下载

  • 安装RabbitMQ只需要下一步下一步即可

  • 右击 rabbitmq-server-3.10.10.exe以管理员身份运行 进行安装

安装完后,cmd输入services.msc打开服务,开启RabbitMQ服务

通过windows快捷键直接找到sbin命令,如果没有直接找到安装目录下找到sbin目录,以管理员身份运行。

输入下面命令,启动管理页面。 

 rabbitmq-plugins.bat enable rabbitmq_management

RabbitMQ在安装好后,可以访问 http://localhost:15672 ,其自带了guest/guest 的用户名和密码。

如果以上操作都进行后,仍然访问不到页面,请重启电脑再次测试。

3.3、创建用户

 角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。

3.4、创建虚拟主机Virtual Hosts

 给用户添加虚拟主机权限

4、RabbitMQ工作模式

首先创建一个Maven项目

导入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

4.1、简单模式

只有一个生产者,一个消费者;生产者将消息发送到队列,消费者从队列中获取消息。

创建一个工具类,用来连接RabbitMQ

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址;默认为 localhost
        connectionFactory.setHost("localhost");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称;默认为 /
        connectionFactory.setVirtualHost("/yh");
        //连接用户名;默认为guest
        connectionFactory.setUsername("guest");
        //连接密码;默认为guest
        connectionFactory.setPassword("guest");
        //创建连接
        return connectionFactory.newConnection();
    }
}

4.1.1、生产者

public class Producer {
    static final String QUEUE_NAME="simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接。只能有一个消费者监听到这队列
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //发送信息
        String message="hello RabbitMQ";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:消息其它属性
         * 参数4:消息内容
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("已发送信息:"+message);

        //关闭资源
        channel.close();
        connection.close();
    }
}

4.1.2、消费者

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由的key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("接收到的消息:" + new String(body, "utf-8"));
            }
        };

        //监听消息
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

先启动消费者,再启动生产者 

4.2、工作队列模式(Work queues)

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4.2.1、生产者 

public class Producer {
    static final String QUEUE_NAME="simple_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接。只能有一个消费者监听到这队列
         * 参数4:是否在不
         * 使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //发送信息
        for (int i = 1; i <=10 ; i++) {
            String message="hello RabbitMQ"+i;
            /**
             * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("已发送信息:"+message);
        }

        //关闭资源
        channel.close();
        connection.close();
    }
}

4.2.2、消费者1

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //收到的消息
                    System.out.println("消费者1--接收到的消息:" + new String(body, "utf-8"));
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //监听消息
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

4.2.3、消费者2

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);

        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //收到的消息
                    System.out.println("消费者2--接收到的消息:" + new String(body, "utf-8"));
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //监听消息
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看消费者消费的消息。

4.3、发布/订阅模式(Publish/Subscribe)

一个生产者发送的消息会被多个消费者获取。发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(binding)的所有的Queue上。这种模式不需要任何Routekey,需要提前将Exchange 与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以和多个Exchange绑定。如果接收到消息的Exchange没有与任何Queue绑定,则消息会丢失。

Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列

  • Direct:定向,把消息交给符合指定routing key 的队列

  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

4.3.1、生产者

public class Producer {
    //交换机名称
    static final String FANOUT_EXCHANGE="fanout_exchange";
    //队列名称
    static final String FANOUT_QUEUE_1="fanout_queue_1";
    static final String FANOUT_QUEUE_2="fanout_queue_2";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        //发送信息
        for (int i = 1; i <=10 ; i++) {
            String message="发布/订阅模式--"+i;
            /**
             * 参数1:交换机名称,如果没有指定则使用默认Default Exchange
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
            System.out.println("已发送信息:"+message);
        }

        //关闭资源
        channel.close();
        connection.close();
    }
}

4.3.2、消费者1

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
        //声明队列
        channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");

        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //收到的消息
                    System.out.println("消费者1--接收到的消息:" + new String(body, "utf-8"));
            }
        };
        //监听消息
        channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

4.3.3、消费者2 

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
        //声明队列
        channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
        //队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE,"");
        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //收到的消息
                System.out.println("消费者2--接收到的消息:" + new String(body, "utf-8"));
            }
        };

        //监听消息
        channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。

4.4、路由模式(Routing)

任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue,这种模式下不需要将Exchange进行任何绑定(binding)操作,消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。

4.4.1、生产者

public class Producer {
    //交换机名称
    static final String DIRECT_EXCHANGE = "direct_exchange";
    //队列名称
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机
        //  参数1:交换机名称
        //  参数2:交换机类型,fanout、topic、direct、headers
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 发送信息到消费者1
        String message = "新增了商品。路由模式;routing key 为 insert ";
        channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 发送信息到消费者2
        message = "修改了商品。路由模式;routing key 为 update";
        channel.basicPublish(DIRECT_EXCHANGE, "update", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        //关闭资源
        channel.close();
        connection.close();
    }
}

4.4.2、消费者1

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHANGE,"insert");

        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        //监听消息
        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

4.4.3、消费者2

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);
        //队列绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_UPDATE,Producer.DIRECT_EXCHANGE,"update");
        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        //监听消息
        channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。

在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 direct_exchange 的交换机,可以查看到如下的绑定:

 Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。

4.5、通配符模式(Topic)

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey指定主题的Queue中。就是每个队列都有其关心的主题,所有的消息都带有一个标题(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配队列。这种模式需要Routekey并且提前绑定Exchange与Queue。在进行绑定时要提供一个该队列对应的主题。‘ # ’表示0个或若干个关键字,‘ * ’表示一个关键字。如果Exchange没有发现能够与RouteKey匹配的Queue,消息会丢失。

#:匹配一个或多个词

*:匹配不多不少恰好1个词

4.5.1、生产者

public class Producer {
    //交换机名称
    static final String TOPIC_EXCHANGE = "topic_exchange";
    //队列名称
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    static final String TOPIC_QUEUE_2 = "topic_queue_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        //  参数1:交换机名称
        //  参数2:交换机类型,fanout、topic、direct、headers
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        // 发送信息
        String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
        channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 发送信息
        message = "修改了商品。Topic模式;routing key 为 item.update" ;
        channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 发送信息
        message = "删除了商品。Topic模式;routing key 为 item.delete" ;
        channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        //关闭资源
        channel.close();
        connection.close();
    }
}

4.5.1、消费者1

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);
        //队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"item.insert");
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"item.update");

        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        //监听消息
        channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

4.5.2、消费者2

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
        //队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"item.delete");
        //接收信息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //收到的消息
                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        //监听消息
        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
        //不关闭资源,应该一直监听消息
    }
}

启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。

在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:

Topic主题模式可以实现 Publish/Subscribe发布与订阅模式Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

4.6、发布确认模式(Publisher Confirms)

与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

发布确认模式有三种策略: 1、单次确认 2、批量确认 3、异步确认

4.6.1、单次确认

public class Single {
   //单个确认  267ms
   public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
       //获取连接
       Connection connection = ConnectionUtil.getConnection();
       //创建通道
       Channel channel = connection.createChannel();
       String queueName = UUID.randomUUID().toString();
       //声明队列
       channel.queueDeclare(queueName,true,false,false,null);
       //开启消息确认发布应答模式
       channel.confirmSelect();
       //记录开始时间
       long start = System.currentTimeMillis();
       //发送1000条信息
       for (int i = 1; i <=1000 ; i++) {
           //模拟信息
           String message=i+"";
           channel.basicPublish("",queueName,null,message.getBytes());
           //单个确认
           boolean flag = channel.waitForConfirms();
           if(flag){
               System.out.println("--------第"+(i)+"条信息发送成功!");
           }else{
               System.out.println("=========第"+(i)+"条消息发送失败!");
           }
       }
       //记录结束时间
       long end = System.currentTimeMillis();
       System.out.println("共耗时:"+(end-start)+"ms");
   }
}

4.6.2、批量确认

//批量确认  72ms
public class More {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        String queueName = UUID.randomUUID().toString();
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        //开启消息确认发布应答模式
        channel.confirmSelect();
        //记录开始时间
        long start = System.currentTimeMillis();
        //发送1000条信息
        for (int i = 1; i <=1000 ; i++) {
            //模拟信息
            String message=i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //批量确认
            if(i%100==0){
                boolean flag = channel.waitForConfirms();
                if(flag){
                    System.out.println("--------第"+(i)+"条信息发送成功!");
                }else{
                    System.out.println("该批消息有确认失败的,需要重新发送整批失败的消息");
                }
            }

        }
        //记录结束时间
        long end = System.currentTimeMillis();
        System.out.println("共耗时:"+(end-start)+"ms");
    }
}

4.6.3、异步确认

//异步确认  46ms
public class Asny {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        String queueName = UUID.randomUUID().toString();
        //声明队列
        channel.queueDeclare(queueName,true,false,false,null);
        //开启消息确认发布应答模式
        channel.confirmSelect();
        //消息发送成功回调函数
        ConfirmCallback ackCallback=(deliveryTag, multiple)->{
            System.out.println("消息发送成功");
        };
        //消息法送失败回调函数
        ConfirmCallback nackCallback=(deliveryTag, multiple)->{
            System.out.println("消息发送失败");
        };
        //注册监听器监听,异步通知
        channel.addConfirmListener(ackCallback,nackCallback);
        //记录开始时间
        long start = System.currentTimeMillis();
        //发送消息
        int message_num =1000;
        for (int i = 1; i <= message_num; i++) {
            String message=i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
        }
        //记录发消息后时间
        long end=System.currentTimeMillis();
        System.out.println("该模式为异步批量确认模式:"+message_num+",耗时:"+(end-start)+"ms");
    }
}

4.6.4、速度对比

  • 同步-单独发布消息:同步等待确认,简单,但吞吐量非常有限。

  • 同步-批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难推断出是那条消息出现了问题。

  • 异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。

4.7、RabbitMQ模式总结

  • 不直接Exchange交换机(默认交换机)

    1. simple简单模式: 一个生产者、一个消费者,生产者生产消息到一个队列被一个消费者接收

    2. work Queue工作队列模式: 一个生产者、多个消费者(竞争关系),生产者发送消息到一个队列中,可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系

  • 使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)

    1. 发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列

    2. 路由模式:使用了direct定向类型的交换机,消费会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息

    3. 通配符模式:使用了topic通配符类型的交换机,消费会携带路由key(*, #),交换机根据消息的路由key与队列的路由key进行对比,匹配的话那么该队列可以接收到消息

  • 发布确认模式(Publisher Confirms)

    1、单次确认

    2、批量确认

    3、同步处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/50009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关系抽取:传统:UniRel: Unified Representation and Interaction for Joint Relational

针对传统下的三元组抽取提出的一种方法&#xff0c;在NYT和webNLG数据集上&#xff0c;再次刷新榜单。 本来对这个结果不是很确定&#xff0c;但作者公布了源码&#xff0c;we can reformulate it . 很少在看到这种文章了吧。 Core idea 换了一种解释思路。 从entity-entity的…

ARM汇编之乘法指令

ARM汇编之乘法指令前言 首先&#xff0c;请问大家几个小小问题&#xff0c;你清楚&#xff1a; 乘法指令有哪些种类呢&#xff1f;ARM乘法指令具体的使用场景又有哪些&#xff1f; 今天&#xff0c;我们来一起探索并回答这些问题。为了便于大家理解&#xff0c;以下是本文的…

精准诊断,精确治疗,智芯传感ZXPA侵入式压力传感器为心血管疾病患者带来福音

近日&#xff0c;据联合国《世界人口展望2022》报告显示&#xff0c;地球人口已正式步入“80亿时代”&#xff01;人口数量增加&#xff0c;从一个侧面反映了人类文明的进步。此外&#xff0c;随着人类预期寿命增加&#xff0c;加上生育率下降&#xff0c;将加剧全球人口老龄化…

基于模型的聚类和R语言中的高斯混合模型

介绍 四种最常见的聚类方法模型是层次聚类&#xff0c;k均值聚类&#xff0c;基于模型的聚类和基于密度的聚类 . 最近我们被客户要求撰写关于聚类的研究报告&#xff0c;包括一些图形和统计输出。 可以基于两个主要目标评估良好的聚类算法&#xff1a; 高组内相似性低组间相…

小程序中的自定义组件以及组件通信、数据共享、插槽、behaviors

一、创建组件和使用自定义组件 1.创建组件 ①在项目的根目录中&#xff0c;鼠标右键&#xff0c;创建components -> 文件夹 ②在新建的components -> 文件夹上,鼠标右键&#xff0c;点击“新建Component ③键入组件的名称之后回车&#xff0c;会自动生成组件对应的4个…

卡尔曼滤波Kalman Filtering:介绍

本文是Quantitative Methods and Analysis: Pairs Trading此书的读书笔记。 控制理论(control theory&#xff09;是工程学的分支之一&#xff0c;主要应对工程系统控制的问题。比如控制汽车发动机的功率输出&#xff0c;稳定电动机的转速&#xff0c;控制“反应速率”&#x…

企业数字化办公利器——华为云桌面Workspace

随着云办公生态的逐渐成熟&#xff0c;华为云桌面也成为了越来越多企业实现随时随地移动办公的选择。 华为云桌面Workspace是一款SAAS产品&#xff0c;是基于华为云云原生架构设计和构建的云桌面服务&#xff0c;可支持云桌面的快速创建、部署和集中运维管理&#xff0c;免除大…

2023年java代做题目参考整理

为方便毕业设计选题&#xff0c;特别整理以下几百题目供参考 班级风采网站的设计 工资绩效管理系统的开发 电子产品销售网站的设计与实现 酒店预订信息管理系统的设计 成绩管理系统 B2C的电子商务系统(J2EE) B2C购物网站设计 教学网站及网上考试系统的设计与实现 ERP采…

STM32学习之Keil5软件配置

前言&#xff1a;代码编写环境可以让编写者在代码编写上有一定的好处&#xff0c;从而得到高效的代码编写。本次笔者写的是一些市面上常用的嵌入式开发软件Keil5&#xff0c;在初始化使用软件界面需要进行配置的。主要分为五大部分&#xff08;文本美化、代码编辑技巧、查找和替…

一篇图解Linux内存碎片整理

我们知道物理内存是以页为单位进行管理的&#xff0c;每个内存页大小默认是4K&#xff08;大页除外&#xff09;。申请物理内存时&#xff0c;一般都是按顺序分配的&#xff0c;但释放内存的行为是随机的。随着系统运行时间变长后&#xff0c;将会出现以下情况&#xff1a; 要解…

树莓派板载蓝牙使用

1 设置树莓派板载蓝牙 1.1 相关环境安装、配置 sudo apt-get update sudo apt-get install pi-bluetooth bluez bluez-firmware blueman1.2 树莓派蓝牙操作 参考&#xff1a; https://blog.csdn.net/guzhong10/article/details/78574577 有时候会失败&#xff0c; 可以尝试…

[附源码]SSM计算机毕业设计学校缴费系统JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

文件包含总结

概念 是指将已有的代码以文件形式包含到某个指定的代码中&#xff0c;从而使用其中的代码或者数据&#xff0c;一般是为了方便直接调用所需文件&#xff0c;文件包含的存在使得开发变得更加灵活和方便。 文件包含常见函数 include() // 执行到include时才包含文件&#xff…

区间信息维护与查询【线段树 】 - 原理2 线段树中的“懒操作”

区间信息维护与查询【线段树 】 - 原理2 线段树中的“懒操作” 之前我们已经说了对线段树的点更新和区间查询&#xff0c;若要求对区间中的所有点都进行更新&#xff0c;该怎么办&#xff1f; 若对区间的每个点都进行更新&#xff0c;则时间复杂度较高&#xff0c;可以引入懒…

Cocos2d-x 3D渲染技术 (三)

包围盒算法 说白了就是给物体装进一个盒子里&#xff0c;该盒子可以装下物体。目的是为了进行碰撞检测。 种类&#xff1a; 球状碰撞体立方体碰撞体胶囊碰撞体Mesh碰撞体 实现原理是OBB包围盒。 经常使用的两种碰撞算法是OBB包围盒和AABB包围盒算法。 OBB包围盒算法 方向…

JavaScript -- 01. 基础语法介绍

文章目录基础语法1 Hello World2 JS的编写位置3 基本语法3.1 多行注释3.2 单行注释3.3 区分大小写3.4 空格和换行会被忽略3.5 以分号结尾3.6 字面量3.7 变量3.8 变量的内存结构3.9 常量3.10 标识符基础语法 JS的基本语法 1 Hello World js的三种输出方式 <!DOCTYPE html&g…

精彩回顾 | 云原生系统软件的产业应用

11月18日&#xff0c;2022年第五届中国金融科技产业大会暨第四届中新&#xff08;苏州&#xff09;数字金融应用博览会“基础软件与云原生系统软件”分论坛成功举办。该论坛由由中国计算机学会CTO CLUB&#xff08;苏州&#xff09;承办&#xff0c;江苏省金融科技云原生融合创…

如何用 Python 做一个简单的翻译工具?

前言 平时经常在网上翻译一些单词&#xff0c;突发奇想&#xff0c;可不可以直接调某些免费翻译网站的接口呢&#xff1f;然后做一个图形界面的翻译小工具&#xff1f;下面开始实践 &#xff08;文末送读者福利&#xff09; 1.先找一下有哪些免费翻译的接口 百度了一下关键字…

神经架构搜索的综合调查:挑战和解决方案(二)

4 PERFORMANCE COMPARISON NAS 是一项很有前途的研究。在本节中&#xff0c;我们根据主流搜索方法 [27, 28] 对现有 NAS 的性能进行分类和比较&#xff0c;同时还根据第 3 节报告了它们使用的优化策略。这些搜索方法主要包括以下内容&#xff1a;强化学习&#xff08;RL&#…

操作系统学习笔记(Ⅲ):内存

目录 1 内存管理 1.1 内存基础知识 1.内存 2.进程运行 1.2 内存管理的概念 1.3 覆盖与交换 1.覆盖 2.交换 3.区别 1.4 连续分配管理方式 1.单一连续分配 2.固定分区分配 3.动态分区分配 1.5 动态分区分配算法 1.首次适应算法 2.最佳适应算法 3.最坏适应算法 …