202、RabbitMQ 之 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型---fanout类型就是广播类型

news2024/10/6 6:48:36

目录

  • ★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型
  • 代码演示:
    • 生产者:producer
    • 消费者:Consumer01
    • 消费者:Consumer02
    • 测试结果
  • 完整代码
    • ConnectionUtil
    • Publisher
    • Consumer01
    • Consumer02
    • pom.xml

★ 使用 fanout 类型的Exchange 实现 Pub-Sub 消息模型

就是声明一个 fanout 类型的 Exchange 来分发消息。消费者进行消费
fanout 类型就是广播模式

fanout 类型 的 Exchange 不会判断消息的路由key,直接将消息分发给绑定到该Exchange的所有队列。

生产者发送一条消息到fanout类型的Exchange后,绑定到该Exchange的所有队列都会收到该消息的一条副本,
而消费者也能分别从不同的队列中读取消息,互不干扰。

▲ fanout类型的Exchange可以很好地模拟JMS的Pub-Sub消息模型。

在这里插入图片描述

代码演示:

都是在前面一篇的代码基础上修改的。
需求:使用 fanout 类型的Exchange ,实行发布-订阅的功能,其实就是创建一个生产者和两个消费者,实现广播模式的消息分发。

在这里插入图片描述

生产者:producer

在生产者中声明Exchange ,然后声明两个消息队列 Queue,
然后给这个Exchange 绑定 这个两个Queue
在这里插入图片描述

在这里插入图片描述

消费者:Consumer01

两个消费者的代码没啥区别,
消费方法的参数 autoAck 都是true, 都是自动确认消费。
两个消费者各自消费自己指定的消息队列。

在这里插入图片描述

在这里插入图片描述

消费者:Consumer02

在这里插入图片描述
在这里插入图片描述

测试结果

消费生产者发送10条消息,两个消费者都能各自消费到10条消息就是正确的。

消息生产者使用fanout这个广播的类型发送消息。
在这里插入图片描述
两个消费者都能消费到10条消息,正确。
在这里插入图片描述

完整代码

ConnectionUtil

package cn.ljh.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//连接工具
public class ConnectionUtil
{
    //获取连接的方法
    public static Connection getConnection() throws IOException, TimeoutException
    {
        //创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来
        ConnectionFactory connectionFactory =  new ConnectionFactory();
        //设置连接信息
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("ljh");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/"); //连接虚拟主机
        //从连接工厂获取连接
        Connection connection = connectionFactory.newConnection();
        //返回连接
        return connection;
    }
}

Publisher

package cn.ljh.rabbitmq.producer;

import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

//消息生产者--使用fanout类型的exchange------就是广播模式
public class Publisher
{
    //常量:定义个Exchange的名字作为常量
    public static final String EXCHANGE_NAME = "myex01.fanout";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接
        Connection conn = ConnectionUtil.getConnection();
        //2、通过Connection获取Channel。
        Channel channel = conn.createChannel();

        //3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定
        channel.exchangeDeclare(EXCHANGE_NAME,/* Exchange名字 */
                BuiltinExchangeType.FANOUT,/* Exchange 类型 */
                true,/* 是否持久化 */
                false,/* 是否自动栅除 */
                false,/* 是否为内部的 Exchange */
                null /* 指定 Exchange 的额外属性 */
        );

        //声明多个消息队列------声明第1个消息队列
        channel.queueDeclare(Consumer01.QUEUE01, true, false, false, null);

        //把 Exchange 和 Queue 绑定起来,绑定第一个消息队列
        channel.queueBind(Consumer01.QUEUE01,EXCHANGE_NAME,
                "" /* 因为Exchange 是fanout类型,所以无需 路由key */,
                null /* 指定 Exchange 的额外属性 */);

        //声明第2个消息队列
        channel.queueDeclare(Consumer02.QUEUE02, true, false, false, null);

        //把 Exchange 和 Queue 绑定起来,绑定第2个消息队列
        channel.queueBind(Consumer02.QUEUE02,EXCHANGE_NAME,
                "" /* 因为Exchange 是fanout类型,所以无需 路由key */,
                null /* 指定 Exchange 的额外属性 */);

        //生产者发送10条消息
        for (int i = 1; i <= 10; i++)
        {
            String message = "生产者发送的第【 " + i + " 】条消息的内容";

            //4、调用Channel的basicPublish()方法发送消息
            channel.basicPublish(EXCHANGE_NAME /* 向这个 fanout类型的 Exchange 发送消息 */,
                    "" /* 因为 Exchange 是fanout 类型,所以有没有路由key都无所谓 */,
                    null /*指定额外的消息的属性*/,
                    message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/
            );
            System.out.println("生产者发送【 "+i+" 】条消息完成");
        }
        //5、关闭资源
        //关闭通道
        channel.close();
        //关闭连接
        conn.close();
    }
}

Consumer01

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消费者1
public class Consumer01
{
    // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
    //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
    //(2)通过Connection获取Channel。
    //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
    //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
    //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。

    //常量
    public final static String QUEUE01 = "firstQueue";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel 消息通道
        Channel channel = conn.createChannel();

        //3、调用 Channel 的 queueDeclare() 方法声明队列,
        //   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
        channel.queueDeclare(QUEUE01, /* 声明的队列名 */
                true,    /* 消息队列是否持久化 */
                false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */
                false, /* 是否自动删除 */
                null   /* 指定消息队列额外的属性 */);


        //4、调用Channel 的 basicConsume()方法开始处理消费消息
        channel.basicConsume(
                QUEUE01 /*消费这个消费队列里面的消息*/,
                true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                new DefaultConsumer(channel)
                {
                    //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                               AMQP.BasicProperties properties /*消息的那些属性*/,
                                               byte[] body /*body:消息的消息体*/) throws IOException
                    {
                        //把消息体中的消息拿出来
                        String message = new String(body, "UTF-8");
                        //printf:格式化输出函数   %s:输出字符串  %n:换行
                        System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);

                    }
                }
        );
    }
}

Consumer02

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消费者2
public class Consumer02
{
    // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
    //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
    //(2)通过Connection获取Channel。
    //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
    //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
    //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。

    //常量
    public final static String QUEUE02 = "secondQueue";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel 消息通道
        Channel channel = conn.createChannel();

        //3、调用 Channel 的 queueDeclare() 方法声明队列,
        //   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
        channel.queueDeclare(QUEUE02, /* 声明的队列名 */
                true,    /* 消息队列是否持久化 */
                false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */
                false, /* 是否自动删除 */
                null   /* 指定消息队列额外的属性 */);

        //4、调用Channel 的 basicConsume()方法开始处理消费消息
        channel.basicConsume(
                QUEUE02 /*消费这个名字的消费队列里面的消息*/,
                true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                new DefaultConsumer(channel)
                {
                    //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                               AMQP.BasicProperties properties /*消息的那些属性*/,
                                               byte[] body /*body:消息的消息体*/) throws IOException
                    {
                        //把消息体中的消息拿出来
                        String message = new String(body, "UTF-8");
                        //printf:格式化输出函数   %s:输出字符串  %n:换行
                        System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);
                    }
                }
        );
    }


}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ljh</groupId>
    <artifactId>rabbitmq_fanout</artifactId>
    <version>1.0.0</version>
    <name>rabbitmq_fanout</name>

    <!--  属性  -->
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
    </properties>

    <!--  依赖  -->
    <dependencies>
        <!-- RabbitMQ 的依赖库 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.0</version>
        </dependency>

    </dependencies>


</project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1084496.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

干货|小白也能自制电子相册赶紧码住~

你是否想拥有一个独一无二的电子相册&#xff0c;却又苦于不知道如何下手&#xff1f;今天教你一个简单的方法&#xff0c;即使你是小白&#xff0c;也能轻松自制电子相册&#xff01; 一、选择合适的工具 首先&#xff0c;你需要选择一个合适的工具来制作电子相册。有很多工具…

四化智造MES(WEB)与金蝶云星空对接集成原材料/标准件采购查询(待采购)连通采购订单新增(原材料采购-采购订单(变更)-TEST)

四化智造MES&#xff08;WEB&#xff09;与金蝶云星空对接集成原材料/标准件采购查询&#xff08;待采购&#xff09;连通采购订单新增(原材料采购-采购订单(变更)-TEST) 对接系统四化智造MES&#xff08;WEB&#xff09; MES建立统一平台上通过物料防错防错、流程防错、生产统…

Python接口自动化测试之【测试函数、测试类/测试方法的封装】

前言 在pythonpytest 接口自动化系列中&#xff0c;我之前的文章基本都没有将代码进行封装&#xff0c;但实际编写自动化测试脚本中&#xff0c;我们都需要将测试代码进行封装&#xff0c;才能被测试框架识别执行。 例如单个接口的请求代码如下&#xff1a; import requests…

echarts 中国地图效果,并附上小旗子

地图的基础部分 使用echarts开发中国地图&#xff0c;并修改地图默认颜色&#xff0c;以及hover效果以及背景色 可以放大缩小 以此文章记录 首先安装echarts npm install echarts 并引入 import * as echarts from echarts 然后去下载中国地图的 json数据 import * as echarts…

免费在线真好用的思维脑图

大家好这里是tony4geek 。 今天给大家介绍一个工具。思维脑图生成器。最近写文章需要用到思维脑图&#xff0c;如果手上没有xmind 这种类工具是挺麻烦的。下载xmind 还得破解注册很费时间。 看看有没有在线生成的&#xff0c;找了好久没有找到合适的&#xff0c;最后在国外一…

鱼哥赠书活动第②期:《AWD特训营:技术解析、赛题实战与竞赛技巧》《ATTCK视角下的红蓝对抗实战指南》《智能汽车网络安全权威指南》上下册

鱼哥赠书活动第①期&#xff1a; 《AWD特训营&#xff1a;技术解析、赛题实战与竞赛技巧》1.1介绍&#xff1a; 《ATT&CK视角下的红蓝对抗实战指南》1.1介绍&#xff1a; 《Kali Linux高级渗透测试》1.1介绍&#xff1a; 《智能汽车网络安全权威指南》上册1.1介绍&#xff…

外汇天眼:Bitcore与Amtop Markets Ltd──诓称数据分析操盘稳赚不赔,指控账户违法逼缴校验金

近几年网络投资风气盛行&#xff0c;但市面上充斥许多诈骗平台&#xff0c;透过保证获利、稳赚不赔等话术诱导民众投资&#xff0c;如今已成为社会不可忽视的严重威胁。 日前&#xff0c;外汇天眼就收到一位受害者爆料&#xff0c;分享她遭到Bitcore假交友诈骗的详细经过。 一开…

Mysql5.7大限将至升级Mysql 8.0过程记录(未完)

一、前言 时间很快&#xff0c;到2023年10月底&#xff0c;MySQL 5.7就到了它的EOL&#xff08;End of Life&#xff09;&#xff0c;届时将不会提供任何补丁&#xff0c;无法应对潜在的安全风险&#xff1b;是时候和 MySQL 5.7 说再见了&#xff01;&#xff01;&#xff01;&…

系统架构师2023备考新版教材-之计算机系统知识01

说明 本篇博客主要围绕2022年系统架构师最新版教程&#xff0c;算上时间&#xff0c;今年应该是这一版教材的第一次考试&#xff0c;说来也气人&#xff0c;一年考一次&#xff0c;然后我毅然就直接报名了&#xff0c;报名之前还不知道教程已经改版了&#xff0c;到近期刷题的…

2023年终旺季正式拉开帷幕,赛盈分销盘点亚马逊秋季Prime Day热销款式!

2023年终旺季正式拉开帷幕&#xff0c;卖家已陆续进入亚马逊秋季prime day、万圣节、圣诞节等各大假日促销的冲刺阶段。 和往年相比&#xff0c;今年的美国消费者将会在假日促销期间增加支出。海外权威机构Deloitte研究表明&#xff0c;11月份开始到次年1月份美国年终旺季线上销…

【WOFOST和PCSE】如何运用模型进行科学研究,如何设置实验和模拟,以及如何解释和分析模型结果

WOFOST&#xff08;WorldFoodStudies&#xff09;和PCSE&#xff08;PythonCropSimulationEnvironment&#xff09;是两个用于农业生产模拟的模型&#xff1a;WOFOST是一个经过多年开发和验证的模型&#xff0c;被广泛用于全球的农业生产模拟和农业政策分析&#xff1b;采用了模…

智能文件夹改名助手,秒级恢复原始文件夹名称,避免繁琐操作!

文件夹改名是我们在整理和管理文件时经常遇到的任务之一。但有时候&#xff0c;在改名的过程中&#xff0c;我们可能会因为操作失误或其他原因而需要恢复回原来的文件夹名称。为了帮助您避免繁琐的操作&#xff0c;我们为您提供了一款智能文件夹改名助手&#xff0c;让您能够在…

【TensorFlow2 之014】在 TF 2.0 中实现 LeNet-5

一、说明 在这篇文章中&#xff0c;我们将展示如何在 TensorFlow 中实现像 \(LeNet-5\) 这样的基础卷积神经网络。LeNet-5 架构由 Yann LeCun 于 1998 年发明&#xff0c;是第一个卷积神经网络。 数据黑客变种rs 深度学习 机器学习 TensorFlow 2020 年 2 月 29 日 | 0 …

JavaScript(上)

1.JavaScript概述 JavaScript 是一种客户端脚本语言。运行在客户端浏览器中&#xff0c;每一个浏览器都具备解析 JavaScript 的引擎 脚本语言&#xff1a;不需要编译&#xff0c;就可以被浏览器直接解析执行了 核心功能就是增强用户和 HTML 页面的交互过程&#xff0c;让页面…

400电话怎么办理

400电话&#xff0c;又称为全国统一客服热线电话&#xff0c;是一种企业或机构为了提供更便捷的客户服务而开通的电话号码。通过拨打400电话&#xff0c;客户可以免费或按照本地市话费率与企业或机构进行沟通&#xff0c;解决问题或获得相关服务。下面将介绍400电话的办理流程和…

企业集中式日志管理解决方案

集中式日志记录解决方案收集日志并统一来自各种网络设备&#xff08;如服务器、防火墙、路由器、工作站&#xff09;、应用程序&#xff08;如IIS、Apache、DHCP&#xff09;、入侵检测系统等的数据。该解决方案在中央控制台中显示日志&#xff0c;使其易于访问&#xff0c;日志…

【若依】定时任务问题:关闭了定时任务,但是依然在跑,且同一时刻跑了多条记录,为什么?

文章目录 问题1描述&#xff1a;原因:办法&#xff1a; 问题2描述&#xff1a;原因&#xff1a;办法&#xff1a; 问题1 描述&#xff1a; 定时任务关闭了&#xff0c; 但是服务器定时任务依然在跑 原因: 若依自带定时任务有缓存&#xff0c;且缓存是服务器内存&#xff0c…

布隆过滤器的优缺点及哈希切割问题

文章目录 1.布隆过滤器优点2.布隆过滤器缺陷3.哈希切割 1.布隆过滤器优点 增加和查询元素的时间复杂度为:O(K)(K为哈希函数的个数&#xff0c;一般较小)&#xff0c;与数据量大小无关哈希函数相互之间没有关系&#xff0c;方便硬件并行运算布隆过滤器不需要存储元素本身&#…

Stable Diffusion XL搭建

本文参考&#xff1a;Stable Diffusion XL1.0正式发布了&#xff0c;赶紧来尝鲜吧-云海天教程 Stable Diffision最新模型SDXL 1.0使用全教程 - 知乎 1、SDXL与SD的区别 &#xff08;1&#xff09;分辨率得到了提升 原先使用SD生成图片&#xff0c;一般都是生成512*512&…

软件测试工程师简历项目经验怎么写?--1000个已成功入职的软件测试工程师简历范文模板(含真实简历)

一、前言&#xff1a;浅谈面试 ​ 面试是我们进入一个公司的门槛&#xff0c;通过了面试才能进入公司&#xff0c;你的面试结果和你的薪资是息息相关的。那如何才能顺利的通过面试&#xff0c;得到公司的认可呢?面试软件测试要注意哪些问题呢?下面和笔者一起来看看吧。这里…