RabbitMq(具体怎么用,看这一篇即可)

news2025/1/11 22:46:36

RabbitMq汇总

  • 1.RabbitMq的传统实现方式
  • 2.SpringAMQP简化RabbitMq开发
    • 2.1 基本消息队列(BasicQueue)
    • 2.2 工作消息队列(WorkQueue)
    • 2.3 发布订阅 -- 广播(Fanout)
    • 2.4 发布订阅 -- 路由(Direct)
    • 2.5 发布订阅 -- 主题(Topic)
  • 2.SpringAMQP声明交换机和队列
    • 2.1 使用bean的方式声明交换机和队列
    • 2.2 使用注解的方式声明交换机和队列

1.RabbitMq的传统实现方式

动手实现一个简单的消息队列
在这里插入图片描述

无论时发布消息还是消费消息,都要建立连接, 所以我们可以将这个步骤抽取出来

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        // 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置服务地址
        factory.setHost("192.168.202.128");
        // 端口
        factory.setPort(5672);
        // 设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        
        // 通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

一、发布消息

  1. 建立连接
  2. 创建通道
  3. 创建队列
  4. 发送消息
  5. 关闭通道和连接
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        Connection connection = ConnectionUtil.getConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();
    }
}

二、订阅消息

  1. 建立连接
  2. 创建通道
  3. 创建队列
  4. 订阅消息
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        Connection connection = ConnectionUtil.getConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

2.SpringAMQP简化RabbitMq开发

一、引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、在发布者消费者两端,都要配置MQ地址
SpringAMQP提供了配置来简化手动创建连接这一复杂的过程

spring:
  rabbitmq:
    host: 192.168.202.128 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

三、简化发送消息
SpringAMQP提供了RabbitTemplate类来简化发送消息的步骤

@Autowired
private RabbitTemplate rabbitTemplate;

四、简化订阅消息
SpringAMQP提供了@RabbitListener注解来简化订阅消息的步骤

@RabbitListener(queues = "simple.queue")
public void listenMessage(String msg) throws InterruptedException {
        
}

2.1 基本消息队列(BasicQueue)

最基本的队列模型:一个生产者发送消息到一个队列,一个消费者从队列中取消息

在这里插入图片描述

实际开发中,我们通常事先在rabbitMq界面创建好队列,然后只要记住队列的名称

一、发布消息
  注入RabbitTemplate来简化操作, RabbitTemplate在执行convertAndSend方法时,会自动开启通道, 往指定名称的队列中发送消息, 并在方法结束后关闭连接和通道

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

二、订阅消息
使用@RabbitListener注解实现对队列的订阅

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

总结:
基本消息队列模型中,在生产者和消费者之间,只有队列这一个媒介

  • 生产者只要知道往哪个队列发送消息
  • 消费者只要知道订阅哪个队列中的消息

2.2 工作消息队列(WorkQueue)

在基本消息队列(BasicQueue)中,我们只有一个消费者, 在工作消息队列模型下,我们可以设置多个消费者同时订阅一个队列
在这里插入图片描述
一、发布消息
  实现和基本消息队列时是一样的,只要知道往哪个队列发送消息, 这里我们演示发送多条信息

@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 20; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

二、订阅消息

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

总结:
工作消息队列模型中,在生产者和消费者之间,只有队列这一个媒介(跟基本模型一样)

  • 生产者只要知道往哪个队列发送消息
  • 消费者只要知道订阅哪个队列中的消息(同一个队列)

2.3 发布订阅 – 广播(Fanout)

广播模式下,引入了一个新的概念:交换机

  交换机是一个消息的中转站, 它可以实现广播、定向、通配符等不同形式的消息递交方式; 交换机只负责递交消息, 并不具备存储消息的能力, 消息最终存储媒介, 依旧是队列, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

在这里插入图片描述

一、发布消息
  现在就不是往队列中发消息了, 发送者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列

@Test
public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

  如果你想了解交换机会往哪些队列中发送消息,可以登录rabbitMq的界面,查看交换机详情, 里面会详细罗列当前交换机绑定的队列
在这里插入图片描述

二、订阅消息
消息的最终存储媒介,依旧是队列.消费者只要知道订阅哪个队列中的消息

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

总结:

  • 生产者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列
  • 消费者只要知道订阅哪个队列中的消息

2.4 发布订阅 – 路由(Direct)

  之前我们学习了广播(Fanout), 在广播模式下, 只要往交换机发送消息, 那么交换机会将消息转发给所有绑定的队列, 而路由(Direct)又称为定向
  交换机绑定了A、B两个队列,我们往交换机中发消息时, 最终希望交换机只把消息转发给B队列,这个过程即路由
在这里插入图片描述

一、发布消息
往交换机发消息的时候,需要指定交换机转发给哪个队列(拥有通用routingKey的队列), 此处我们设置的routingKey为red

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

我们发现这个交换机绑定了两个队列, 每个队列都设置了两个routingKey, 其中都有red, 所以这两个队列都能收到消息
在这里插入图片描述

二、订阅消息

@RabbitListener(queues =  "direct.queue1")
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues =  "direct.queue2")
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

总结:

  • 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
  • 消费者只要知道订阅哪个队列中的消息(一直都没变过)

2.5 发布订阅 – 主题(Topic)

  主题(Topic)是对路由(Direct)的一种补充, 我们希望某一个组的队列都能收到消息, 但是在rabbitMq中无法将队列编组, 有了主题(Topic)后,我们可以将这一组的队列的routingKey都设置为china.#, 那只会Routingkey只要符合通配符规则, 例如china.jiangsuchina.jiangsu.suzhou 这个组都可以接收到消息
在这里插入图片描述

  • #:匹配一个或多个词
    item.#:能够匹配item.spu.insert 或者 item.spu
  • *:匹配不多不少恰好1个词
    item.*:只能匹配item.spu

一、发布消息
跟路由时一样, 交换机拿到routingKey后会去匹配对应的队列

@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

当下判断两个队列都符合routingKey
在这里插入图片描述

二、订阅消息

@RabbitListener(queues =  "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues =  "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

总结:

  • 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
  • 交换机会根据routingKey来匹配符合条件的队列(这个过程是交换机来完成,所以我们不用关心)
  • 消费者只要知道订阅哪个队列中的消息(一直都没变过)

2.SpringAMQP声明交换机和队列

2.1 使用bean的方式声明交换机和队列

启动项目后, 就会在rabbitMq中创建交换机和队列, 包括两者的绑定关系

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

2.2 使用注解的方式声明交换机和队列

Direct定向
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

Topic主题

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384931.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024级浙江大学MBA提面申请流程参考

近年来浙大MBA项目的招生一直都有提前批面试的环节&#xff0c;而且每年在申请政策方面也会做出一些微调&#xff0c;但大的方面不会做调整&#xff0c;2024年MBA提面申请即将开始&#xff0c;对此杭州达立易考教育结合2023年的情况为大家梳理出来基本的申请流程和批次参考&…

【Spring Boot】Spring Boot以Repository方式整合Redis

1 简介 Redis是高性能的NoSQL数据库&#xff0c;经常作为缓存流行于各大互联网架构中。本文将介绍如何在Springboot中整合Spring Data Redis&#xff0c;使用Repository的方式操作。 代码结构如下&#xff1a; 2 整合过程 2.1 安装Redis数据库 为了节省时间&#xff0c;就直…

为什么很多计算机专业大学生毕业后还会参加培训?

基于IT互联网行业越来越卷的现状&#xff0c;就算是科班出身&#xff0c;很多也是达不到用人单位的要求。面对这样的现实情况&#xff0c;有的同学会选择继续深造&#xff0c;比如考个研&#xff0c;去年考研人数457万人次&#xff0c;可见越来越的同学是倾向考研提升学历来达到…

管道。环境变量和常用命令

文章目录管道与文件重定向的区别举例环境变量使用自己编写的程序像命令符一样常用命令grepagwctree . -acutsortmorehistory管道 管道类似于重定向&#xff0c;但是又不太一样&#xff0c;管道可以连接好几个&#xff0c;把第一个的输出当成第二个的输入&#xff0c;第二个的输…

实践Spring5 响应式编程框架WebFlux

WebFlux 以 Reactor 库为基础, 基于异步和事件驱动&#xff0c;可以让我们在不扩充硬件资源的前提下&#xff0c;提升系统的吞吐量和伸缩性。一、什么是 Spring WebFlux了解 WebFlux ,首先了解下什么是 Reactive Streams。Reactive Streams 是 JVM 中面向流的库标准和规范&…

论文推荐:ScoreGrad,基于能量模型的时间序列预测

能量模型&#xff08;Energy-based model&#xff09;是一种以自监督方式执行的生成式模型&#xff0c;近年来受到了很多关注。本文将介绍ScoreGrad&#xff1a;基于连续能量生成模型的多变量概率时间序列预测。如果你对时间序列预测感兴趣&#xff0c;推荐继续阅读本文。 为什…

Qt实现系统桌面目录下文件搜索的GUI:功能一:文件查找与现实

⭐️我叫恒心&#xff0c;一名喜欢书写博客的研究生在读生。 原创不易~转载麻烦注明出处&#xff0c;并告知作者&#xff0c;谢谢&#xff01;&#xff01;&#xff01; 这是一篇近期会不断更新的博客欧~~~ 有什么问题的小伙伴 欢迎留言提问欧。 功能点一&#xff1a;文件查找与…

《MongoDB入门教程》第27篇 创建索引

本文将会介绍 MongoDB 中的索引概念&#xff0c;以及如何利用 createIndex() 方法创建索引。 索引简介 假设存在一本包含介绍各种电影的图书。 如果想要查找一部名为“Pirates of Silicon Valley”的电影&#xff0c;我们需要翻阅每一页&#xff0c;直到发现该电影的介绍为止…

从ChatGPT的技术发展角度解析未来智能化的发展方向

ChatGPT 由人工智能研究实验室 OpenAI 于 2022年11 月 30 日推出。在推出时就带来不小的震动&#xff0c;但真正点燃全民热潮的是&#xff0c;应该是从今年的二月初算起&#xff0c;是全网全平台涵盖各行业领域的舆论盛况。 本文就以ChatGPT为切入点&#xff0c;从技术发展角度…

MQTT协议分析

目录 一、前言 二、MQTT协议概述 概念 基本原理 MQTT协议的结构 MQTT的QoS机制 QoS 0&#xff1a;最多一次传输 QoS 1&#xff1a;至少一次传输 QoS 2&#xff1a;恰好一次传输 三、MQTT的应用场景 四、MQTT的优点和缺点 五、MQTT协议的实现 六、实战体验MQTT …

自己动手写编译器:DFA状态机最小化算法

上一节我们完成了从NFA到DFA的状态机转换&#xff0c;有个问题是状态机并非处于最有状态&#xff1a; 在上图的状态机中&#xff0c;状态6和7其实可以合成一个状态点&#xff0c;本节我们看看如何将这类节点进行合并&#xff0c;使得状态机处于最精简状态(状态4也是终结点&…

KDZD土壤电阻率测试仪

一、简介 KDZD土壤电阻率测试仪专为现场测量接地电阻、土壤电阻率、接地电压、交流电压而精心设计制造的&#xff0c;采用新数字及微处理技术&#xff0c;精密4线法、3线法和简易2线法测量接地电阻&#xff0c;导入FFT(快速傅立叶变换)技术、AFC(自动频率控制)技术&#xff0c;…

基于树莓派4B设计的音视频播放器(从0开始)

一、前言 【1】功能总结 选择树莓派设计一款家庭影院系统,可以播放本地视频、网络视频直播、游戏直播、娱乐直播、本地音乐、网络音乐,当做FM网络收音机。 软件采用Qt设计、播放器引擎采用ffmpeg。 当前的硬件选择的是树莓派4B,烧写官方系统,完成最终的开发。 本篇文章主…

Qt QtCreator 安卓开发环境搭建

踩坑 我的qt是使用在线安装工具安装的&#xff0c;Qt版本使用的是5.15.2&#xff0c;QtCreator版本9.0.2 在网上很多教程都是如下步骤 1.安装qt 2.安装jdk 3.安装android-sdk 4.安装android-ndk 5.配置android设置 例如&#xff1a; https://blog.csdn.net/weixin_51363326/…

【p2p】专利:P2p网络中数据传输的方法、电子设备、装置、网络架构

基于混合CDN的低延时直播P2P技术实践 huya 大佬们的讲座。 而且发表了很多专利: 2018年的,点击直接阅读。 本文是学习笔记 本申请公开了P2P网络中数据传输的方法、电子设备、装置、网络架构,该方法包括步骤:接收服务器发送的数据包,所述数据包由共享资源拆分而成,并由服务…

金山轻维表项目进展自动通知

项目经理作为项目全局把控者&#xff0c;经常要和时间“赛跑”。需要实时了解到目前进展如何&#xff0c;跟进人是那些&#xff1f;哪些事项还未完成&#xff1f;项目整体会不会逾期&#xff1f;特别是在一些大型公司中&#xff0c;优秀的项目经理已经学会使用金山轻维表做项目…

05 Android基础--内部存储与外部存储

05 Android基础--内部存储与外部存储什么是内部存储&#xff0c;什么是外部存储&#xff1f;内部存储与外部存储的代码示例什么是内部存储&#xff0c;什么是外部存储&#xff1f; 1.内部存储与外部存储的存储介质&#xff1a; 内部存储的介质&#xff1a;RAM(内存) 内部ROM …

【连接池】什么是HikariCP?HikariCP 解决了哪些问题?为什么要使用 HikariCP?

文章目录什么是连接池什么是HikariCPHikariCP 解决了哪些问题&#xff1f;为什么要使用 HikariCP&#xff1f;HikariCP 的使用Maven支持数据库什么是连接池 数据库连接池负责分配、管理和释放数据库的连接。 数据库连接复用&#xff1a;重复使用现有的数据库长连接&#xff0…

PayPal轮询收款的那些事儿

想必做跨境电商独立站的小伙伴&#xff0c;对于PayPal是再熟悉不过了&#xff0c;PayPal是一个跨国际贸易的支付平台&#xff0c;对于做独立站的朋友来说跨境收款绝大部分都是依赖PayPal以及Stripe条纹了。简单来说PayPal跟国内的支付宝有点类似&#xff0c;但是PayPal它是跨国…

攒了一冬的甜,米易枇杷借力新电商走出川西大山

“绿暗初迎夏&#xff0c;红残不及春。魏花非老伴&#xff0c;卢橘是乡人。”苏轼文中的卢橘&#xff0c;就是枇杷&#xff0c;在苏轼看来&#xff0c;相较于姚黄魏紫&#xff0c;来自故乡四川的枇杷更为亲近。 四川省攀枝花市米易县是全国枇杷早熟产区之一&#xff0c;得益于…