RabbitMQ知识总结一

news2024/11/24 15:57:45

更多知识在我的语雀平台:
https://www.yuque.com/ambition-bcpii/muziteng

RabbitMQ

1. RabbitMQ引言

1.1 什么是MQ

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

1.2 MQ有哪些

主要的MQ产品包括:RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ、Kafka、IBM WebSphere 等。

市面上比较火爆的几款MQ:ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal公司维护。

RabbitMQ严格的遵循AMQP协议,一种高级消息队列协议,帮助我们在进程之间传递异步消息。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等

1.3 不同MQ的特点

特性ActiveMqRabbitMqRocketMQKafka
成熟度成熟成熟比较成熟成熟的日志领域
时效性微秒级毫秒级毫秒级
社区活跃度
单机吞吐量万级,吞吐量比RocketMQ和Kafka要低了一个数量级万级,吞吐量比RocketMQ和Kafka要低了一个数量级10万级,RocketMQ也是可以支撑高吞吐的一种MQ10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic数量对吞吐量的影响topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topictopic从几十个到几百个的时候,吞吐量会大幅度下降所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
可用性高,基于主从架构实现高可用性高,基于主从架构实现高可用性非常高,分布式架构非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据经过参数优化配置,可以做到0丢失经过参数优化配置,消息可以做到0丢失
功能支持MQ领域的功能极其完备基于erlang开发,所以并发能力很强,性能极其好,延时很低MQ功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
优劣势总结非常成熟,功能强大,在业内大量的公司以及项目中都有应用偶尔会有较低概率丢失消息而且现在社区以及国内应用都越来越少,官方社区现维护越来越少,几个月才发布一个版本而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用erlang语言开发,性能极其好,延时很低;吞吐量到万级,MQ功能比较完备而且开源提供的管理界面非常棒,用起来很好用社区相对比较活跃,几乎每个月都发布几个版本分在国内一些互联网公司近几年用rabbitmq也比较多一些但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集

1.4 RabbitMQ介绍

官网:https://rabbitmq.com/

RabbitMQ is the most widely deployed open source message broker.

为什么最受欢迎,应用最广泛?

  1. 使用AMQP协议 支持很多业务场景 比如 点对点 交换机路由 发布订阅模式 能适用很多业务场景
  2. 使用 erlang 语言 这个语言的特点 叫做面向并发编程 自身并发能力强 对socket 编程 支持友好
  3. 和spring 无缝整合
  4. 对数据一致性 数据丢失 错误处理 非常友好 可以不丢失任何数据 对错误数据恢复

2. RabbitMQ安装

2.1 下载

官网下载地址:https://www.rabbitmq.com/download.html

这里我们使用Linux环境下进行下载,并且使用docker-compose进行一键安装

使用 docker-compose 安装 启动服务 进入服务内部 启动web 访问

cd /opt
mkdir docker_rabbitmq
cd docker_rabbitmq/
vim docker-compose.yml
# -d 后台作为守护进程启动
docker-compose up -d

docker-compose.yml 文件内容如下

version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672     #rabbitmq  服务的端口号
      - 15672:15672   # rabbitmq 图形化界面的端口号
    volumes:
      - ./data:/var/lib/rabbitmq

打开浏览器:http://xxx.xxx.xxx.xxx:15672 用户名 guest 密码 guest

有防火墙得记得打开防火墙,并且开放端口

image-20221218192704100

2.2 RabbitMQ架构

2.2.1 官方的简单架构图

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange
  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息
  • Exchange - 交换机:和生产者建立连接并接收生产者的消息
  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

image-20221218192809981

2.2.2 RabbitMQ的完整架构图

image-20221218192831967

image-20221218192856970

image-20221218192908815

image-20221218192927319

查看图形化界面并创建一个Virtual Host

创建一个全新test用户,全新的Virtual Host,并且将test用户设置上可以操作/test的权限

image-20221218193037491

image-20221218193109551

3. RabbitMQ的使用

3.1 RabbitMQ的通讯方式

image-20221218193223103

image-20221218193240748

image-20221218193248106

image-20221218193307296

3.2 Java连接RabbitMQ

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

创建连接工具类

public class RabbitMqUtil {
    public static Connection getConnection() {
        // 创建连接mq 的连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 rabbitmq 的主机
        factory.setHost("xxx.xxx.xxx.xxx");
        // 设置端口号
        factory.setPort(5672);
        // 设置用户名
        factory.setUsername("test");
        // 设置密码
        factory.setPassword("test");
        // 设置连接哪个虚拟机
        factory.setVirtualHost("/test");
        // 创建Connection
        Connection conn = null;
        try {
            conn = factory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 返回
        return conn;
    }
}

3.3 Hello-World

一个生产者,一个默认的交换机,一个队列,一个消费者

image-20221218193552981

创建生产者,创建一个channel,发布消息到默认exchange,指定路由规则。

创建消费者,创建一个channel,创建一个队列,并且去消费当前队列

P:生产者 也就是要发送消息的程序

C:消费者 消息的接受者 会一直等待消息的到来

queue:消息队列 图中红色部分 类似一个邮箱 可以缓存消息 生产者向其中投递消息 消费者从其中取消息

public class HelloWorldTest {

    /**
     * 队列名称
     */
    private static final String QUEUE_NAME = "hello";


    /**
     * 生产者发布消息到队列
     */
    @Test
    public void publish() throws Exception {
        // 1. 获取 Connection
        Connection connection = RabbitMqUtil.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 发布消息到 exchange,同时指定路由规则
        String msg = "Hello-World222";
        // 发布消息
        // 参数1:指定 exchange 交换机,当前模式是 生产者---队列---消费者 模式
        // 参数2:指定路由的规则,使用具体的队列名称    队列名称
        // 参数3:指定传递消息所携带的 properties,使用 null
        // 比如:MessageProperties.PERSISTENT_BASIC 表示持久化消息
        // 参数4:指定发布的具体消息,byte[]类型
        // 第一个参数是 exchangeName (默认情况下代理服务器端是存在一个""名字的 exchange 的,
        // 因此如果不创建 exchange 的话我们可以直接将该参数设置成 "",如果创建了 exchange 的话,
        // 我们需要将该参数设置成创建的 exchange 的名字),第二个参数是路由键
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
        // Ps: exchange 是不会帮你将消息持久化到本地的,Queue 才会帮你持久化消息
        System.out.println("生产者发布消息成功!");
        // 4. 释放资源
        channel.close();
        connection.close();
    }

    /**
     * 消费者消费消息
     */
    @Test
    public void consume() throws Exception {
        // 1. 获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        // 2. 创建 channel
        Channel channel = connection.createChannel();
        // 3. 管道绑定队列
        // 参数1:queue - 指定队列的名称
        // 参数2:durable - 当前队列是否需要持久化
        //参数3:exclusive - 是否排外conn.close()-当前队列会被自动删除,当前队列只能被一个消费者消费
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        /**
         * DefaultConsumer 是  Consumer 接口的实现类    接口中的定义的方法如下 这个不作为重点   了解即可
         * handleCancel:除了调用basicCancel的其他原因导致消息被取消时调用。
         * handleCancelOk:basicCancel调用导致的订阅取消时被调用。
         * handleConsumeOk:任意basicComsume调用导致消费者被注册时调用。
         * handleDelivery:消息接收时被调用。
         * handleRecoverOk:basic.recover-ok被接收时调用
         * handleShutdownSignal:当Channel与Conenction关闭的时候会调用。
         */
        // 参数1:queue - 指定消费哪个队列
        // 参数2:autoAck - 指定是否自动ACK 开启自动确认机制 (true,接收到消息后,会立即告诉RabbitMQ)
        // 参数3:consumer - 消费回调接口
        // 4. 设置一个回调,消费队列中的消息
        // 简易版自定义 Consumer
        // 只需要重写 DefaultConsumer 的 handleDelivery 方法即可取出消息,额外属性新增属性等操作
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:" + new String(body, StandardCharsets.UTF_8));
            }
        });
        System.out.println("消费者开始监听队列!");
        // 不要让消费者线程结束 否则看不到监听效果了   如果没有这行代码  下面不要关闭通道和连接
        System.in.read();
        // 5. 释放资源
        channel.close();    // 不建议关闭 通道和来连接
        connection.close();
    }
}

点对点的模型应用场景:

比如在注册的时候发送短信验证就可以以消息队列的形式 调用短信服务接口

再比如签到送积分的功能 可以用此模型向积分服务发送请求

HelloWorld模型总结:生产者发送消息到消息队列,消费者监听消息队列消息的变化,一旦有消息,消费者就去消费,这种模型比较简单,一个生产者对应一个消费者,也是一些简单的业务用的比较多的场景,消息队列在中间就类似一个邮箱一个缓存,生产者把消息发送到消息队列,被消费者监听到就去处理消息依次来完成对应的业务操作。

这种模型是最简单的模型,可能应对不了某些特殊的场景。比如消费者在处理某些消息时因为业务逻辑的复杂或者消费者处理过慢,会造成消息队列中的消息不断造成消息的堆积,所以我们希望生产者生产的消息可以给更多的消费者消费,这样就提高了消费者处理消息的 效率 ,当然我们要保证 不同的消费者处理的消息是不同的 不然会造成业务的重复处理, 这样 就可以处理消息快一些不堵塞消息队列。基于这种需求就是下面的Work模型。

3.4 Work

一个生产者,一个默认的交换机,一个队列,两个消费者

work queues 也称为task queues ,任务模型 。

HelloWorld 模型的不足 :当消息处理比较耗时时,可能生产消息的速度会远远大于消息的消费速度,长此以往消息堆积的越来越多,无法及时处理此时就可以考虑work模型,让多个消费者绑定到同一个队列,共同消费队列中的消息。队列中的消息 一旦被消费就会消失因此任务是不会被重复执行的。

image-20221218194039864

只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing。

P:生产者 消息的发送者

C1:消费者 领取任务 并完成任务

C2: 消费者2 领取任务并完成任务

queue: 红色部分队列

Qos: (Quality of Service) QoS 是消息的发送方(Sender)和接受方(Receiver)之间达成的⼀个协议

ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。

/**
 * Work 模型: 按劳分配  能者多劳
 * 现在有两个消费者:消费者1处理消息处理的慢2秒一个
 * 消费者2处理消息处理的快1秒一个,但是在自动确认模式下
 * channel.basicConsume("Work",true,consumer); Autoack  true,生产者发送了100条消息 会一下子全被接受 在web页面看不到被消费的过程
 * 而且消费者轮流依次消费 没有出现谁消费的快 就多消费一点的情况
 * 如果改成 手动确认模式生产者发送的 100 条消息会逐渐被消费 在web页面能看到被消费的过程而且哪个消费者消费的快就会多消费
 * 实际场景 我们希望 能者多劳 处理消息快的消费者 多处理一些
 * 所以不建议使用消息的自动确认应该改为手动确认
 */
public class WorkTest {

    private static final String QUEUE_NAME = "Work";

    /**
     * 生产者
     */
    @Test
    public void publish() throws Exception {
        // 1. 获取Connection
        Connection connection = RabbitMqUtil.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();
        // 3. 发布消息到exchange,同时指定路由的规则
        String msg = "Hello-Work!";
        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (i + msg).getBytes(StandardCharsets.UTF_8));
        }
        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("生产者发布消息成功");
        // 4. 释放资源
        channel.close();
        connection.close();
    }

    /**
     * 消费者1 能力较弱 2s处理一个消息
     */
    @Test
    public void consume1() throws Exception {
        // 1. 获取Connection
        Connection connection = RabbitMqUtil.getConnection();
        // 2. 创建Channel
        Channel channel = connection.createChannel();

        // 3. 管道绑定队列
        // 参数1:queue - 指定队列的名称
        // 参数2:durable - 当前队列是否需要持久化(true)
        // 参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        // 参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        // 参数5:arguments - 指定当前队列的其他信息
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 1. 指定当前消费者,一次消费多少个消息,没有过来的消息,还在队列中保存,这样设置,不会造成消息丢失
        // 因为假如不指定一次消费一条消息就有可能有多条消息到达消费者此时消费者一旦宕机到达消费者的消息也就丢了
        // 所以消息从队列到消费者一次来一条免得过来多条消息在半路丢了
        channel.basicQos(1);     // 不要一次性的把消息都给消费者容易丢失一次给一条安全

        // 4. 设置一个回调,消费队列中的消息 指定手动 ack
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 2. 手动ack
                // 参数1:long类型  标识队列中哪个具体的消息
                // 参数2:boolean 类型 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(), false);
                // 处理完了消息,手动确认一下,队列再删除这个消息;这种机制保证消息永不丢失
                // 队列给消费者 一条消息,消费者收到消息,处理完了之后手动确认,确认了之后,队列才把消息删除,保证消息永不丢失
                // 而且消费者确认一个消息,队列发送一个消息,消费者确认的快,队列发送的快,能者多劳
            }
        };
        // 参数1:queue - 指定消费哪个队列
        // 参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        // 参数3:consumer - 指定消费回调
        channel.basicConsume(QUEUE_NAME, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 5. 释放资源
        channel.close();
        connection.close();
    }

    /**
     * 消费者2 能力较强 1s处理一个消息
     */
    @Test
    public void consume2() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 绑定队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);
        // 设置一个回调,消费队列中的消息 指定手动 ack
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("消费者2号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 手动 ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 5. 释放资源
        channel.close();
        connection.close();
    }
}

3.5 Publish/Subscribe

fanout 扇出 也称为广播

一个生产者,一个交换机,多个队列,多个消费者

可以有多个消费者,每个消费者都有自己的 queue

每个队列都要绑定到Exchange 交换机

生产者发送消息 只能发送到交换机 交换机决定要发给哪个队列 生产者无法决定

交换机把消息发送给绑定过得所有队列

队列的消费者都能拿到消息,实现一条消息被多个消费者消费

image-20221218194822556

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。

让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

消费者还是正常的监听某一个队列即可。

使用场景:比如 注册成功 发送一个消息 短信服务 邮件服务 积分服务 这些服务 作为消费者

来消费接受这个消息 生产实践用的也较多

public class PubSubTest {

    private static final String EXCHANGE_NAME = "pubsub-exchange";

    @Test
    public void publish() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        // 创建交换机 - 绑定某一个队列
        // 参数1: exchange  交换机的名称
        // 参数2: 指定exchange  交换机的类型
        // FANOUT - pubsub  广播类型 ,   DIRECT - Routing , TOPIC - Topics
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        // 第一次 发布消息 到 交换机
        // 广播模式下路由key 是没有用的 routingKey 没有意义  所以空着不写
        String msg = "Hello-PubSub!";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
        // 第二次 发布消息 到 交换机
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
        System.out.println("生产者发布消息成功!");
        // 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume1() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        // 指定当前消费者一次消费多少消息
        channel.basicQos(1);
        // 设置一个回调,消费队列中的消息 指定手动 ack
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("消费者1号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 手动 ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume2() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        // 获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        // 设置消费者一次消费多少消息
        channel.basicQos(1);
        // 设置回调 进行消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 手动 ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 释放资源
        channel.close();
        connection.close();
    }
}

3.6 Routing

Routing之订阅模型-Direct

在Fanout模式中,一条消息 会被所有订阅的队列都消费。但是在某些场景下 我们希望不同的消息被不同的队列消费。这时 就要用到Direct 类型的Exchange

在Direct模型下:

1.队列和交换机的绑定 不能是任意绑定了 ,而是要指定一个RoutingKey (路由key)

2.消息的发送方在向Exchange 发送消息时 ,也必须指定消息的RoutingKey

3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key 进行判断,只有队列的RoutingKey 与消息的RoutingKey 完全一致 ,才会接收到消息

image-20221218195041056

P:生产者 向Exchange 发送消息 发送消息时 会指定一个 RoutingKey

X:Exchange (交换机) 接受生产者的消息 然后把消息传递给 与RoutingKey 完全匹配的队列

C1 消费者 其所在队列指定了需要RoutingKey 为error 的消息

C2 消费者 其所在队列 指定了需要RoutingKey 为 info error 的消息

生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。

消费者没有变化

public class RoutingTest {

    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "routing-exchange";

    /**
     * Routing-key
     */
    private static final String ERROR_ROUTING_KEY = "ERROR";
    private static final String INFO_ROUTING_KEY = "INFO";


    @Test
    public void publish() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 发布消息到exchange,同时指定路由的规则
        channel.basicPublish(EXCHANGE_NAME, ERROR_ROUTING_KEY, null, "ERROR".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME, INFO_ROUTING_KEY, null, "INFO1".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME, INFO_ROUTING_KEY, null, "INFO2".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME, INFO_ROUTING_KEY, null, "INFO3".getBytes(StandardCharsets.UTF_8));
        System.out.println("生产者发布消息成功");
        channel.close();
        connection.close();
    }

    @Test
    public void consume1() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 基于路由key绑定队列和交换机
        channel.queueBind(queueName, EXCHANGE_NAME, ERROR_ROUTING_KEY);
        // 指定当前消费者一次消费多少消息
        channel.basicQos(1);
        // 设置回调
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 手动 ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume2() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 获取临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 基于路由key绑定交换机和队列
        channel.queueBind(queueName, EXCHANGE_NAME, INFO_ROUTING_KEY);
        channel.queueBind(queueName, EXCHANGE_NAME, ERROR_ROUTING_KEY);
        // 设置消费者一次消费多少消息
        channel.basicQos(1);
        // 设置回调
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 手动 ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 释放资源
        channel.close();
        connection.close();
    }
}

3.7 Topic

Routing之订阅模型-Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey 把消息路由到不同的队列,只不过Topic 类型的Exchange可以让队列在绑定RoutingKey 的时候使用通配符!这种模型RoutingKey 一般都是由一个或多个单词组成,多个单词之间以 “.” 分割, 例如 item.insert

image-20221218195200209

#通配符 
    * (star) can substitute  for exactly  one word .     匹配不多不少恰好一个词
    #  (hash)  can  substitute for zero  or more words .  匹配一个或多个词 
# 如:
	audit.#      匹配  audit.irs.corporate   或者  audit.irs  等
	audit.*      只能匹配   audit.irs    

生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。

消费者只是监听队列,没变化。

public class TopicTest {

    /**
     * 交换机名称
     */
    private static final String EXCHANGE_NAME = "topic-exchange";

    /**
     * *  匹配不多不少恰好一个词
     * #  匹配一个或多个词
     */
    private static final String ROUTING_KEY1 = "*.red.*";
    private static final String ROUTING_KEY2 = "fast.#";
    private static final String ROUTING_KEY3 = "*.*.rabbit";

    @Test
    public void publish() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 发布消息到exchange,同时指定路由的规则
        channel.basicPublish(EXCHANGE_NAME, "fast.red.monkey", null, "红快猴子".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "slow.black.dog", null, "黑慢狗".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "fast.white.cat", null, "快白猫".getBytes());
        System.out.println("生产者发布消息成功!");
        // 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume1() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列和交换机
        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY1);
        channel.basicQos(1);
        // 设置回调
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 手动 ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 释放资源
        channel.close();
        connection.close();
    }

    @Test
    public void consume2() throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        // 绑定队列和交换机
        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY2);
        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY3);
        channel.basicQos(1);
        // 设置回调
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2号接收到消息:" + new String(body, StandardCharsets.UTF_8));
                // 手动 ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者开始监听队列!");
        System.in.read();
        // 释放资源
        channel.close();
        connection.close();
    }
}

4. SpringBoot整合RabbitMQ

4.1 环境

导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

编写配置文件

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: test
    password: test
    virtual-host: /test

4.2 Hello-World

第一种使用配置类定义队列的方式

//===========配置类===========
@Configuration
public class RabbitMQConfig {

    @Bean
    Queue simpleQueue(){
        return new Queue("simpleQueue");
    }
}

//=============消费者=============
@Component
public class HelloConsume {

    // 在配置类中创建queue在这里引用即可从simpleQueue队列中消费消息
    // @RabbitListener也可以在类上使用
    @RabbitListener(queues = "simpleQueue")
    public void receive(String msg){
        System.out.println("消费者接收到的消息是:" + msg);
    }
}

//==========生产者========
@SpringBootTest
class SpringbootMqApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    // 生产者到simpleQueue对列发布消息
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("simpleQueue","SpringBoot整合MQ发送的消息");
    }
}

第二种使用注解的方式定义队列

//=========消费者==========
@Component
public class HelloConsume {

    // 使用queuesToDeclare声明队列并从这个队列中消费消息
    @RabbitListener(queuesToDeclare = @Queue(name = "simpleQueue"))
    public void receive(String msg){
        System.out.println("消费者接收到的消息是:" + msg);
    }
}

//===========生产者============
@SpringBootTest
class SpringbootMqApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    RabbitTemplate rabbitTemplate;

    // 生产者到simpleQueue对列发布消息
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("simpleQueue","SpringBoot整合MQ发送的消息");
    }
}

4.3 Work

// 生产者
@Test
public void testWork(){
    // Work 模型
    for(int i=0;i<20;i++){
        rabbitTemplate.convertAndSend("work","work模型:   "+i);
    }
}
-----------------------------------------------------------------------------
@Component
public class WorkConsumer {

    @RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
    public void getMessage(Object message){
        System.out.println("接收到消息1:" + message);
    }

    @RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
    public void getMessage2(Object message){
        System.out.println("接收到消息2:" + message);
    }
}

4.4 Pub/Sub

// 生产者
@Test
public void testFanout(){
    // 生产发布模型 广播模型
    rabbitTemplate.convertAndSend("boot-pubsub-exchange","","广播模式");
}
  -----------------------------------------------------------
// 消费者
@Component
public class PubSubConsumer {
      @RabbitListener(bindings = {
      @QueueBinding(value = @Queue,   // 创建临时队列
       exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout")) // 绑定的交换机
      })
      public void getMessage(Object message){
          System.out.println("消费者1:"+message);
      }
    
      @RabbitListener(bindings = {
      @QueueBinding(value = @Queue,   // 创建临时队列
      exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout")) // 绑定的交换机
      })
      public void getMessage2(Object message){
          System.out.println("消费者2:"+message);
      }
}

4.5 route

// 生产者
@Test
public void testRouting(){
    // 路由模式
    rabbitTemplate.convertAndSend("boot-route-exchange","info","发送的是info的key的路由信息");
}
// ------------------------------------------------------
// 消费者
@Component
public class RouteConsumer {

    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue,    // 创建临时队列
     exchange = @Exchange(value = "boot-route-exchange",type = "direct"),
                  key = {"info","error"})})
    public void getMessage1(Object message){
        System.out.println("消费者1:"+message);
    }

    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue,    // 创建临时队列
    exchange = @Exchange(value = "boot-route-exchange",type = "direct"),key = {"info"})})
    public void getMessage2(Object message){
        System.out.println("消费者2:"+message);
    }
}   

4.6 topic

// 生产者
@Test
void testTopic2(){
    //rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
    rabbitTemplate.convertAndSend("boot-topic-exchange","black.dog.and.cat","黑色狗和猫!!");
}
//消费者
@Component
public class TopicConsumer {

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue,
                            exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
                            key = {"*.red.*","black.*.#"}
                    )
            }
    )
    public void getMessage1(Object message){
        System.out.println("接收到消息1:" + message);
    }

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue,
                            exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
                            key = {"black.*.#"}
                    )
            }
    )
    public void getMessage2(Object message){
        System.out.println("接收到消息2:" + message);
    }
}

4.7 手动ack

要在消息消费完之后才告诉 rabbitmq 这个消息消费了,而不是还没消费就确认。

避免消息消费失败了但是消息已经被自动确认了 那么这个消息就相当于丢了 即丢消息

实现步骤:

1.在yml 配置文件指定手动配置

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual    #  手动指定 ack
  1. 在消费者的方法参数中指定参数
@RabbitListener(
       bindings = {
               @QueueBinding(
                       value = @Queue,
                       exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
                       key = {"black.*.#"}
               )
       }
)
public void getMessage3(String msg, Channel channel, Message message) throws IOException {
   System.out.println("接收到消息3:" + msg);

  // 手动  ack
   channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

5. RabbitMQ的其他操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/99477.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

带token的登陆页面爆破方法(burp宏+爬虫脚本分享)

文章目录前言一、token参数分析二、burp设置宏操作三、爬虫脚本四、小结前言 在工作中&#xff0c;会遇到很多登陆页面有token保护&#xff0c;如果用Burpsuite直接抓取数据包并使用爆破模块&#xff0c;则会因token过期导致无法爆破。此时至少可以采用三种办法&#xff1a; 第…

Java诊断工具——arthas,实时监控,了解一下

文章目录1、arthas 简介官方文档2、arthas 的使用场景3、安装&启动3.1 安装3.2 启动4、常用命令5、使用示例5.1 stack5.2 jad5.3 sc5.4 watch5.5 trace5.6 jobs5.7 logger5.8 dashboard5.9 redefine6、其它1、arthas 简介 arthas是由阿里巴巴中间件团队开源的Java诊断工具。…

kubernetes对外服务之Ingress

目录 ​​​​​​​一、Ingress 是什么 1.1Service的作用 1.2Ingress简介 二、Ingress 安装 三、Ingress 代理访问 3.1Ingress HTTP 代理访问 3.2 Ingress: HTTPS 代理访问 3.3Ingress Contronler怎么工作的&#xff1f; ​​​​​​​​​​​​​​一、Ingress 是什…

Java核心实操:内存溢出 实战、内存泄漏实战

文章很长&#xff0c;而且持续更新&#xff0c;建议收藏起来&#xff0c;慢慢读&#xff01;疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 &#xff1a; 免费赠送 :《尼恩Java面试宝典》 持续更新 史上最全 面试必备 2000页 面试必备 大厂必备 涨薪必备 免费赠送 经典…

ARM系列之MMU TLB和ASID基础概念介绍。

目录1、为什么要设计TLB&#xff1f;TLB中不包含我们需要的映射关系怎么办&#xff1f;2、TLB中都包含了啥&#xff1f;3、那什么是ASIDAddress Space ID&#xff08;ASID&#xff09;4、小结内存寻址简要过程如下&#xff1a;VA以页表大小取余&#xff0c;得到PA的低位&#x…

Go sync.Pool池化的学习

一句话总结&#xff1a;保存和复用临时对象&#xff0c;减少内存分配&#xff0c;降低 GC 压力。 一.前言 Go 语言标准库也大量使用了 sync.Pool&#xff0c;例如 fmt 和 encoding/json。 1.1 要解决的问题 一个新技术亦或是一个新名词&#xff0c;总是为了解决一些问题才出…

数据挖掘课程设计报告总结

一、实验题目 实验一 Apriori算法设计与应用 二、背景介绍 Apriori算法是一种挖掘关联规则的频繁项集算法&#xff0c;其核心思想是通过候选集生成和向下封闭检测两个阶段来挖掘频繁项集。 三、实验内容 1.3.1 运用的理论知识 关联规则挖掘是数据挖掘中最活跃的研究方法之…

数控恒流源电路简单讲解

&#xff08;1&#xff09;最近课设是做一个可步进的恒流源&#xff0c;所以查查找了很多资料之后。说一下自己对于恒流源电路的简单理解。 &#xff08;2&#xff09;我只是会将怎么使用和调整数据进行讲解&#xff0c;至于为什么这样只会讲我懂的部分。本人知道的也不是很多&…

【RPA前置知识】 整理并总结ForEach Activity类

&#x1f40b;作者简介&#xff1a;博主是一位.Net开发者&#xff0c;同时也是RPA和低代码平台的践行者。 &#x1f42c;个人主页&#xff1a;会敲键盘的肘子 &#x1f430;系列专栏&#xff1a;.Net实用方法总结 &#x1f980;专栏简介&#xff1a;本专栏介绍如何编写 Windows…

Biopython教程

Biopython教程 参考&#xff1a; https://biopython-cn.readthedocs.io/zh_CN/latest/index.html 蛋白质文件获取 Entrez方法 from Bio import Entrez Entrez.email邮箱名 #如123456789qq.com handleEntrez.esearch(dbprotein,term2rbg) recordEntrez.read(handle) idrecor…

C++PrimerPlus 第八章 函数探幽-8.2 引用变量

目录 8.2 引用变量 8.2.1 创建引用变量 8.2.2 将引用用作函数参数 8.2.3 引用的属性和特别之处 8.2.3.1 临时变量、引用参数和const 8.2.4 将引用用于结构 8.2.4.1 程序说明 8.2.4.2 为何要返回引用 8.2.4.3 返回引用时需要注意的问题 8.2.4.4 为何将const用于引用返…

纳米柱阵列超颖表面构建模块的严格分析

摘要 利用先进的制造技术&#xff0c;人们成功实现了具有高数值孔径的可见波长的超透镜。通常使用空间变化的纳米结构作为模块来构建超透镜。在这个例子中分析了用于组成偏振不敏感超透镜的纳米柱状结构。利用傅立叶模态方法&#xff08;FMM&#xff0c;也称为RCWA&#xff09;…

Windows配置开机自启jar包,不显示黑窗口,并输出日志

背景 如果是在 Linux 下开机自启一个服务相对比较简单&#xff0c;这次遇到一个需求是关于 Windows 开机自启的&#xff1a; 在 Windows 环境下开机自动运行一个 SpringBoot 服务&#xff1b;而且由于是一个后台服务&#xff0c;要求对终端用户无感知&#xff1b;为后期维护方…

AC自动机的实现思想与原理

1. 基本介绍 本文最先发布于博客园&#xff0c;原地址&#xff1a;AC自动机的实现与思想原理 - yelanyanyu - 博客园 (cnblogs.com) 1.1案例引入 有一个字典有若干的敏感词 String[] str;&#xff0c;有一个大文章 string&#xff0c;我们要找到大文章中出现的所有的敏感词&…

物联网ARM开发- 6协议 FSMC模拟8080时序驱动LCD(上)

目录 一、常见显示器介绍 1、显示器分类 2、显示器的基本参数 二、TFT-LCD控制原理 1、TFT-LCD结构 2、TFT-LCD控制框图 3、控制原理 LCD数据传输时序 LCD数据传输时序参数 三、SSD1963液晶控制器 1、SSD1963液晶控制器 2、SSD1963内部框图分析 3、8080写时序…

RK3568平台开发系列讲解(音视频篇)FFmpeg公共基础参数

🚀返回专栏总目录 文章目录 一、公共操作部分二、每个文件主要操作部分三、视频操作部分四、音频操作部分沉淀、分享、成长,让自己和他人都能有所收获!😄 📢当我们使用 FFmpeg 时,有一些贯穿 FFmpeg 各个组件的核心参数,在我们查看帮助信息时就可以看到,help 不带参…

基于 Tensorflow 2.x 实现多层卷积神经网络,实践 Fashion MNIST 服装图像识别

一、 Fashion MNIST 服装数据集 Fashion MNIST 数据集&#xff0c;该数据集包含 10 个类别的 70000 个灰度图像。大小统一是 28x28的长宽&#xff0c;其中 60000 张作为训练数据&#xff0c;10000张作为测试数据&#xff0c;该数据集已被封装在了 tf.keras.datasets 工具包下&…

move functions with VS without noexcept

本文所讲对移动函数使用noexcept修饰时带来的效率提升只针对std::vector。而对std::deque来说没有功效。 1. 针对std::vector 1.1 move functions with noexcept 当移动构造函数有noexcept修饰时&#xff0c;在对std::vector进行push_back扩充致使vector的size等于capacity时…

26. GPU以及 没有gpu的情况下使用colab

在PyTorch中&#xff0c;CPU和GPU可以用torch.device(‘cpu’) 和torch.device(‘cuda’)表示。 应该注意的是&#xff0c;cpu设备意味着所有物理CPU和内存&#xff0c; 这意味着PyTorch的计算将尝试使用所有CPU核心。 然而&#xff0c;gpu设备只代表一个卡和相应的显存。 如果…

【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)

一、Spark SQL简介 park SQL是spark的一个模块&#xff0c;主要用于进行结构化数据的SQL查询引擎&#xff0c;开发人员能够通过使用SQL语句&#xff0c;实现对结构化数据的处理&#xff0c;开发人员可以不了解Scala语言和Spark常用API&#xff0c;通过spark SQL&#xff0c;可…