RabbitMQ消息模型之发布订阅Publish-Subscribe

news2025/1/23 11:58:41

发布订阅模型 Publish/Subscribe

发布订阅模型也称为广播模型,交换机类型需要指定为Fanout,正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。每个消费者都监听自己的队列,所以同一个消息,会被所有的消费者共同消费。Fanout 这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播。
image-20220526173046882

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者。
  • 每个消费者有自己的Queue。
  • 每个队列都要绑定到Exchange。
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列。
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费。

创建生产者

public class MyProducer {

    @Test
    public void test() throws Exception {
        // Fanout模式不需要指定队列
        String queue = "";
        // 交换机
        String exchange = "logs";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "fanout");
        for (int i = 0; i < 3; i++) {
            // 发布消息
            channel.basicPublish(exchange, queue, null, ("DEBUG LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("INFO LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("WARN LOG -> " + i).getBytes());
            channel.basicPublish(exchange, queue, null, ("ERROR LOG -> " + i).getBytes());
        }
    }
}

创建消费者1

public class MyConsumer1 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将临时队列绑定exchange
        channel.queueBind(queue, "logs", "");
        //处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //将临时队列绑定exchange
        channel.queueBind(queue, "logs", "");
        //处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
            }
        });
    }
}

image-20220526175729612

两个消费者同时都收到了消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1268401.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多模块项目打包部署

目录结构 这些模块间相互依赖&#xff0c;打包的时候打父模块&#xff0c;就是带root的这个 先clean&#xff0c;再package&#xff0c;就跟一般的项目一样了。 有可能遇到报错Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.22.2:test&#xff…

[Java]JUC并发编程

JUC并发编程 一、什么是JUC 使用到 java.util 工具包、包、分类 二、线程和进程 进程&#xff1a;一个正在运行的程序&#xff0c;QQ.exe Music.exe 程序的集合&#xff1b; 一个进程往往可以包含多个线程&#xff0c;至少包含一个&#xff01; Java默认有两个线程&#x…

微服务链路追踪组件SkyWalking实战

概述 微服务调用存在的问题 串联调用链路&#xff0c;快速定位问题&#xff1b;理清服务之间的依赖关系&#xff1b;微服务接口性能分析&#xff1b;业务流程调用处理顺序&#xff1b; 全链路追踪&#xff1a;对请求源头到底层服务的调用链路中间的所有环节进行监控。 链路…

短 URL 生成器设计:百亿短 URL 怎样做到无冲突?

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 我们先来看看&#xff0c;当高并发遇到海量数据处理时的架构。在社交媒体上&#xff0c;人们经常需要分享一些 URL&#xff0c;但是有些 URL 可能会很长&#xff0c;比如&#xff1a; https://time.geekbang.org/hyb…

C#图像处理OpenCV开发指南(CVStar,03)——基于.NET 6的图像处理桌面程序开发实践第一步

1 Visual Studio 2022 开发基于.NET 6的OpenCV桌面程序 1.1 为什么选择.NET 6开发桌面应用&#xff1f; 选择 .NET 6&#xff08;最早称为 .NET Core&#xff09;而非 Frameworks.NET 的理由是&#xff1a;&#xff08;1&#xff09;跨平台&#xff1b;已经支持Windows,Linux…

PyQt基础_008_ 按钮类控件QSpinbox

基本操作 import sys from PyQt5.QtCore import * from PyQt5.QtGui import * from PyQt5.QtWidgets import *class spindemo(QWidget):def __init__(self, parentNone):super(spindemo, self).__init__(parent)self.setWindowTitle("SpinBox 例子")self.resize(300,…

错误 LNK2001 无法解析的外部符号 __imp__CrtDbgReport

”属性“ -->”设置“ --> ”c“ – > ”代码生成“ --> ”运行库“ &#xff0c;将 ”多线程(MT)“ 改为 ”多线程(MTD)“。

JavaScript包装类型

前端面试大全JavaScript包装类型 &#x1f31f;经典真题 &#x1f31f;包装类型 &#x1f31f;真题解答 &#x1f31f;总结 &#x1f31f;经典真题 是否了解 JavaScript 中的包装类型&#xff1f; &#x1f31f;包装类型 在 ES 中&#xff0c;数据的分类分为基本数据类型…

NCo3.1(08) - Nco3 服务器端编程

本篇博文不再重复ABAP调用外部服务器的基础&#xff0c;只介绍 NCo3 开发的过程和要点。需要了解相关知识点的小伙伴们自行参考&#xff1a; SAP接口编程 之JCo3.0系列(06) - Jco服务器端编程 PyRFC 服务器端编程要点 创建项目 新建一个 Console 项目&#xff0c;选择 .Net …

第一个C代码讲解

文章目录 编写C文件创建文本文件编写代码修改文件后缀切换文件路径 编译代码打开命令行使用gcc编译代码运行程序双击运行使用命令行运行 代码分析编译过程 编写C文件 编辑C代码文件的工具有很多&#xff0c;为了让大家初学的时候摆脱编译软件的干扰&#xff0c;更容易理解编译过…

数据结构 -- 并查集与图

目录 1.并查集 1.结构 2.原理 3.代码实现 1.存储 2.寻找根节点 3.是否为同一集合 4.求集合个数 5.合并为同一集合中 整体代码 2.图 1.基本知识 1.各个属性 2.特殊名词 3.图的解释 2.图的表示 1.邻接矩阵 2.邻接表 3.图的遍历 1.BFS--广度优先遍历 2.DFS--…

Python with提前退出:坑与解决方案

Python with提前退出&#xff1a;坑与解决方案 问题的起源 早些时候使用with实现了一版全局进程锁&#xff0c;希望实现以下效果&#xff1a; Python with提前退出&#xff1a;坑与解决方案 全局进程锁本身不用多说&#xff0c;大部分都依靠外部的缓存来实现的&#xff0c;r…

python读取excel自动化生成sql建表语句和java实体类字段

1、首先准备一个excel文件&#xff1a; idtypenameidint学号namestring姓名ageint年龄sexstring性别weightdecimal(20,4)体重scoredecimal(20,4)分数 2、直接生成java字段和注释&#xff1a; import pandas as pddf pd.read_excel(test.xlsx, sheet_nameSheet1)for i in ran…

k8s中批量处理Pod应用的Job和CronJob控制器介绍

目录 一.Job控制器 1.简介 2.Jobs较完整解释 3.示例演示 4.注意&#xff1a;如上例的话&#xff0c;执行“kubectl delete -f myJob.yaml”就可以将job删掉 二.CronJob&#xff08;简写为cj&#xff09; 1.简介 2.CronJob较完整解释 3.案例演示 4.如上例的话&#xf…

【MySQL】常用内置函数:数值函数 / 字符串函数 / 日期函数 / 其他函数

文章目录 数值函数round()&#xff1a;四舍五入ceiling()&#xff1a;上限函数floor()&#xff1a;地板函数abs()&#xff1a;计算绝对值rand()&#xff1a;生成0-1的随机浮点数 字符串函数length()&#xff1a;获取字符串中的字符数upper() / lower()&#xff1a;将字符串转化…

使用Ray创建高效的深度学习数据管道

大家好&#xff0c;用于训练深度学习模型的GPU功能强大但价格昂贵。为了有效利用GPU&#xff0c;开发者需要一个高效的数据管道&#xff0c;以便在GPU准备好计算下一个训练步骤时尽快将数据传输到GPU&#xff0c;使用Ray可以大大提高数据管道的效率。 1.训练数据管道的结构 首…

优化邮件群发效果的方法与策略

怎样优化邮件群发效果&#xff1f;这是许多企业在进行邮件营销时常常被问到的问题。邮件营销是一种高效且经济实惠的市场推广方式&#xff0c;但如何使邮件真正引起接收者的兴趣并产生预期的效果并不容易。好的营销效果可以带来高回报、高收益率&#xff0c;但是怎么提升群发效…

工会排队奖励模式:创新营销策略,实现共赢局面

在当今的商业环境中&#xff0c;创新营销策略的重要性日益凸显。工会排队奖励模式作为一种新型的营销策略&#xff0c;旨在通过结合线上和线下消费&#xff0c;激励消费者购买产品或服务&#xff0c;并获得返现奖励。这种模式通过将消费者的支出和商家的抽成资金纳入奖金池&…

CSS3样式详解之圆角、阴影及变形

目录 前言一、圆角样式&#xff08;border-radius&#xff09;二、元素阴影&#xff08;box-shadow&#xff09;三、过渡动画样式&#xff08;transition&#xff09;1. transition-property(用于设置属性名称)2. transition-duration&#xff08;设置时间&#xff09;3. trans…

7、信息收集(2)

文章目录 一、指纹识别1、Nmap工具2、Wafw00f工具 二、使用Maltego进行情报收集 一、指纹识别 1、Nmap工具 命令一&#xff1a;nmap -sS -sV <ip>&#xff0c;使用TCP SYN的方式&#xff0c;扫描目标主机上常规端口运行的服务版本。 -sS&#xff1a;指定使用TCP SYN的方…