RabbitMQ工作模式-发布订阅模式

news2025/1/22 17:50:37

Publish/Subscribe(发布订阅模式)

官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html

使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个消费者都可以消费完整的消息。

消息广播给所有订阅该消息的消费者。

在RabbitMQ中,生产者不是将消息直接发送给消息消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。

生产者将消息发送给交换器。交换器非常简单,从生成者接收消息,将消息推送给消息队列。交换器必须清楚的知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器的类型。

在这里插入图片描述

发布订阅使用fanout的交换器,创建交换器,名称为test

channel.exchangeDeclare("test","fanout");

fanout交换器很简单,从名称就可以看出来(用风扇吹出去),将所有的收到的消息发给它的知道的所有队列。

存在一个默认的交换器。

此样例使用的是临时队列,即消费都实现将自动创建此队列,当消费都退出后,此队列也将自动删除。

队列名称如

amq.gen-gjKBgQ9PSmoj2YQGMOdPfA

样例代码

消费者1的代码:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class OneConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明的临时队列,名称由rabbitMQ自动生成
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("临时队列的名称:" + queueName);

    // 定义交换机
    channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);

    // 消息队列与交换机的绑定
    channel.queueBind(queueName, "ex.testfan", "");

    channel.basicConsume(
        queueName,
        new DeliverCallback() {
          @Override
          public void handle(String consumerTag, Delivery message) throws IOException {
            System.out.println(
                "one 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
          }
        },
        new CancelCallback() {
          @Override
          public void handle(String consumerTag) throws IOException {}
        });
  }
}

消费者2

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class TwoConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 生成的临时队列
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("临时队列的名称:" + queueName);

    // 定义交换机
    channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);

    // 消息队列与交换机的绑定
    channel.queueBind(queueName, "ex.testfan", "");

    channel.basicConsume(
        queueName,
        new DeliverCallback() {
          @Override
          public void handle(String consumerTag, Delivery message) throws IOException {
            System.out.println(
                "two 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
          }
        },
        new CancelCallback() {
          @Override
          public void handle(String consumerTag) throws IOException {}
        });
  }
}

消费者3

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ThirdConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 生成的临时队列
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("临时队列的名称:" + queueName);

    // 定义交换机
    channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);

    // 消息队列与交换机的绑定
    channel.queueBind(queueName, "ex.testfan", "");

    channel.basicConsume(
        queueName,
        new DeliverCallback() {
          @Override
          public void handle(String consumerTag, Delivery message) throws IOException {
            System.out.println(
                "third 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
          }
        },
        new CancelCallback() {
          @Override
          public void handle(String consumerTag) throws IOException {}
        });
  }
}

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Product {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    try {
      // 声明fanout类型交换机
      channel.exchangeDeclare("ex.testfan", "fanout", true, false, false, null);

      for (int i = 0; i < 20; i++) {
        channel.basicPublish(
            "ex.testfan",
            // 路由key
            "",
            // 属性
            null,
            // 信息
            ("hello world fan " + i).getBytes(StandardCharsets.UTF_8));
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    } finally {
      channel.close();
      connection.close();
    }
  }
}

观察下队列的绑定的情况:

在未启动消费都队列之前:

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]# 

在未启动消费者之前,只有看到几个默认的生产者。绑定的队列为空。

启动三个消费者:

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ ex.testfan         │ fanout  │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬────────────────────────────────┬──────────────────┬────────────────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name               │ destination_kind │ routing_key                    │ arguments │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue            │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue            │ amq.gen-UG67rAw03FGbBupHX6o18g │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue            │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue            │                                │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue            │                                │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue            │                                │           │
└─────────────┴─────────────┴────────────────────────────────┴──────────────────┴────────────────────────────────┴───────────┘
[root@nullnull-os ~]# 

当启动生产者后,可以发现已经产生了3个默认的交换机及队列的绑定关系。以及手动绑定的3个队列的关系。

启动生产者,查看消费情况:

消费者1

临时队列的名称:amq.gen-VbV63vwAn0IBzC7n6I--vQ
one 获取到的消息:hello world fan 0
one 获取到的消息:hello world fan 1
one 获取到的消息:hello world fan 2
one 获取到的消息:hello world fan 3
one 获取到的消息:hello world fan 4
one 获取到的消息:hello world fan 5
one 获取到的消息:hello world fan 6
one 获取到的消息:hello world fan 7
one 获取到的消息:hello world fan 8
one 获取到的消息:hello world fan 9
one 获取到的消息:hello world fan 10
one 获取到的消息:hello world fan 11
one 获取到的消息:hello world fan 12
one 获取到的消息:hello world fan 13
one 获取到的消息:hello world fan 14
one 获取到的消息:hello world fan 15
one 获取到的消息:hello world fan 16
one 获取到的消息:hello world fan 17
one 获取到的消息:hello world fan 18
one 获取到的消息:hello world fan 19

消费者2:

临时队列的名称:amq.gen-KadV2OsCRLb84p2k_ijuww
two 获取到的消息:hello world fan 0
two 获取到的消息:hello world fan 1
two 获取到的消息:hello world fan 2
two 获取到的消息:hello world fan 3
two 获取到的消息:hello world fan 4
two 获取到的消息:hello world fan 5
two 获取到的消息:hello world fan 6
two 获取到的消息:hello world fan 7
two 获取到的消息:hello world fan 8
two 获取到的消息:hello world fan 9
two 获取到的消息:hello world fan 10
two 获取到的消息:hello world fan 11
two 获取到的消息:hello world fan 12
two 获取到的消息:hello world fan 13
two 获取到的消息:hello world fan 14
two 获取到的消息:hello world fan 15
two 获取到的消息:hello world fan 16
two 获取到的消息:hello world fan 17
two 获取到的消息:hello world fan 18
two 获取到的消息:hello world fan 19

消息者3:

临时队列的名称:amq.gen-TcqXVnoS2mjOpfCw1o1CZw
third 获取到的消息:hello world fan 0
third 获取到的消息:hello world fan 1
third 获取到的消息:hello world fan 2
third 获取到的消息:hello world fan 3
third 获取到的消息:hello world fan 4
third 获取到的消息:hello world fan 5
third 获取到的消息:hello world fan 6
third 获取到的消息:hello world fan 7
third 获取到的消息:hello world fan 8
third 获取到的消息:hello world fan 9
third 获取到的消息:hello world fan 10
third 获取到的消息:hello world fan 11
third 获取到的消息:hello world fan 12
third 获取到的消息:hello world fan 13
third 获取到的消息:hello world fan 14
third 获取到的消息:hello world fan 15
third 获取到的消息:hello world fan 16
third 获取到的消息:hello world fan 17
third 获取到的消息:hello world fan 18
third 获取到的消息:hello world fan 19

再停止几个消费者查看队列信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ ex.testfan         │ fanout  │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]# 

可以看到,当客户端退出之后,临时队列也就消失了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/950315.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

win10底部任务栏开机后长时间未响应的解决办法

https://blog.csdn.net/hj960511/article/details/128746025?share_tokenAA088876-4477-44B8-B978-5C7C7726D552&tt_fromcopy_link&utm_sourcecopy_link&utm_mediumtoutiao_ios&utm_campaignclient_share win10底部任务栏开机后长时间未响应的解决办法-CSDN博…

亚马逊云科技re:Inforce大会:为企业提供端到端的安全防护能力

2023年&#xff0c;生成式AI带来了无数的创新&#xff0c;并将会在行业应用中产生更多的新能力、新场景。与此同时&#xff0c;关于生成式AI的风险管控成为各方关注焦点&#xff0c;数据隐私、合规保护、防欺诈等&#xff0c;已成为生成式AI时代的安全合规的新话题。 随着云上业…

(一)SpringBoot 整合WebSocket 前端 uniapp 访问

第一次使用WebSocket&#xff0c;所以最需要一个及其简单的例子&#xff0c;跑通之后&#xff0c;增加自己对该技术的理解。&#xff08;技术基础介绍就免掉了&#xff0c;后面再补&#xff09; 案例逻辑&#xff1a;目前只有一个用户&#xff0c;而且是一个用户给服务器发送数…

linux系统中详解u-boot之网络移植与调试

​今天给大家讲一讲如何完善u-boot网络部分的移植和调试。 一、前章回顾 上一章&#xff0c;已经讲过如何讲uboot.2022.10版本移植到我们自己的imx6ull开发板上&#xff0c;但是最后编译下载后网络部分未能正确识别&#xff0c;今天我们就来讲一讲网络部分的调试。 上一篇ub…

静力触探数据智能预处理(2)

静力触探数据智能预处理&#xff08;2&#xff09; 前言 数据处理方式已由手工1.0、计算机辅助2.0到人工智能3.0的趋势发展。现场采集的静力触探数据通常是由仪器厂家开发的数据采集软件保存&#xff0c;将原始数据导入Excel中&#xff0c;数据格式需要花费一定的时间整理&am…

Git管理本地代码

一、Git配置 当安装完 Git 应该做的第一件事就是设置你的用户名称与邮件地址。 这样做很重要&#xff0c;因为每一个 Git 的提交都会使用这些信息&#xff0c;并且它会写入到你的每一次提交中&#xff0c;不可更改 – global全局配置 通过 --global 选项可以设置全局配置信息 …

第6篇:ESP32连接无源喇叭播放音乐《涛声依旧》

第1篇:Arduino与ESP32开发板的安装方法 第2篇:ESP32 helloword第一个程序示范点亮板载LED 第3篇:vscode搭建esp32 arduino开发环境 第4篇:vscodeplatformio搭建esp32 arduino开发环境 第5篇:doit_esp32_devkit_v1使用pmw呼吸灯实验 D5连接喇叭正极&#xff0c;GND连接喇叭负…

推荐一本AI+医疗书:《机器学习和深度学习基础以及医学应用》,附21篇精选综述

当代医学仍然存在许多亟待解决的问题&#xff0c;比如日益增加的成本、医疗服务水平的下降...但近几年AI技术的发展却给医疗领域带来了革命性的变化&#xff0c;因此AI医疗迅速兴起。 从目前已知的成果来看&#xff0c;人工智能在医学领域的应用已经相当广泛&#xff0c;智能诊…

【pyqt5界面化工具开发-14】初始牛刀-登录工具

目录 0x00 前言&#xff1a; 一、准备好ui的加载 二、获取对应的触发事件 三、触发事件绑定 三、输入内容的调用 三、完善登录逻辑 0x00 前言&#xff1a; 在逻辑代码的处理添加数据包的请求&#xff0c;返回数据包的判断&#xff0c;就可以完整实现登录检测的一个界面化…

zookeeper介绍、zookeeper的安装与配置

1、zookeeper介绍 1.1 官网说明 官方地址&#xff1a;http://zookeeper.apache.org/ 它是拿来管理 Hadoop、Hive、Pig的管理员&#xff0c; Apache Hbase和Apache Solr以及阿里的Dubbo等项目中都采用到了Zookeeper。 一句话&#xff1a;ZooKeeper是一个分布式协调技术、高性…

基于Kohonen网络的聚类算法

1.案例背景 1.1 Kohonen网络 Kohonen网络是自组织竞争型神经网络的一种,该网络为无监督学习网络,能够识别环境特征并自动聚类。Kohonen神经网络是芬兰赫尔辛基大学教授Teuvo Kohonen 提出的,该网络通过自组织特征映射调整网络权值,使神经网络收敛于一种表示形态。在这一形态中…

OLAP学习

OLAP又叫联机分析处理&#xff0c;联机分析处理(OLAP)的概念最早是由关系数据库之父E.F.Codd于1993年提出的。 当今的数据处理大致可以分成两大类&#xff1a;联机事务处理OLTP&#xff08;on-linetransactionprocessing&#xff09;、联机分析处理OLAP&#xff08;On-LineAna…

外观模式:简化复杂子系统的访问与使用

文章目录 1. 简介2. 外观模式的基本结构3. 外观模式的实现步骤4. 外观模式的应用与实例4.1 图形界面库的外观模式应用4.2 文件压缩与解压缩的外观模式应用4.3 订单处理系统的外观模式应用 5. 外观模式的优缺点5.1 优点5.2 缺点 6. 总结 1. 简介 外观模式是一种结构型设计模式&…

从过滤器初识责任链设计模式

下面用的过滤器都是注解方式 可以使用非注解方式,就是去web.xml配置映射关系 上面程序的执行输出是 再加一个过滤器 下面来看一段程序 输出结果 和过滤器是否非常相识 但是上面这段程序存在的问题:在编译阶段已经完全确定了调用关系,如果你想改变他们的调用顺序或者继续添加一…

基于MYSQL的主从同步和读写分离

目录 一.完成MySQL主从同步&#xff08;一主两从&#xff09; 1.主库配置 2.建立同步账号 3.锁表设置只读 4.备份数据库数据 5.主库备份数据上传到从库 6.从库上还原备份 7.解锁 8.从库上设定主从同步 9.启动从库同步开关 10.检查状态 二.基于MySQL一主两从配置&…

取数组中每个元素的最高位

1 题目 /*程序将一维数组a中N个元素的最高位取出&#xff0c;保存在一维数组b的对应位置。 程序运行结果为&#xff1a; a&#xff1a;82 756 71629 5 2034 b: 8 7 7 5 2 */ 2 思考 简单来说就是取一个数据的最高位。 一开始的笨方法没有办法判断数据的长度&#xff0c;后来…

nowcoder NC236题 最大差值

目录 题目描述&#xff1a; 示例1 示例2 题干解析&#xff1a; 暴力求解&#xff1a; 代码展示&#xff1a; 优化&#xff1a; 代码展示&#xff1a; 题目跳转https://www.nowcoder.com/practice/a01abbdc52ba4d5f8777fb5dae91b204?tpId128&tqId33768&ru/exa…

BFT最前线|AI透过胸片估测患者年龄,可揭示其患慢性病风险;中信建投:国产人形机器人核心零部件成本下行值得期待

文 | BFT机器人 AI视界 TECHNOLOGY NEWS 看点1 AI模拟芯片能效达传统芯片14倍 《自然》23日发表的研究报道了一种能效为传统数字计算机芯片14倍的人工智能&#xff08;AI&#xff09;模拟芯片。这一由IBM研究实验室开发的芯片在语音识别上的效率超过了通用处理器。该技术或能…

【java中的Set集合】HashSet、LinkedHashSet、TreeSet(最通俗易懂版!!)

目录 一、HashSet集合 1.HashSet集合的特点 2.HashSet常用方法 二、LinkedHashSet集合 LinkedHashSet集合的特点 三、TreeSet集合 1.TreeSet集合的特点 2.TreeSet的基本使用 四、HashSet、LinkedHashSet、TreeSet的使用场景 五、list和set集合的区别 一、HashSet集合 …