RabbitMQ系列(12)--Fanout交换机的简介与实现

news2024/9/19 11:11:43

1、Fanout交换机的介绍

接收所有的消息广播到它知道的队列中,类似于发布订阅模式,只要Fanout禁用RoutingKey,绑定同一交换机的队列都可同时收到消息;若Fanout启动了routingkey,则绑定同一交换机且routingkeyKey相同的队列才能收到同一消息

2、Fanout交换机的实现

(1)新建一个名为fanout的包,用于装发布确认的代码

效果图:

(2)新建一个名为Receive01的类用于编写消费者的代码

代码如下:

注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

https://blog.csdn.net/m0_64284147/article/details/129465871

package com.ken.fanout;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息接收
 */
public class Receive01 {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //声明一个队列(生成一个临时的队列,队列的名称是随机的,当消费者断开与队列的连接时,队列自动删除,减少我们对队列的管理)
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机与队列
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Receive01接收到的消息:" + new String(message.getBody(),"UTF-8"));
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
    }

}

(3)复制Receive01类并粘贴重命名为Receive02

代码如下:

package com.ken.fanout;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息接收
 */
public class Receive02 {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //声明一个队列(生成一个临时的队列,队列的名称是随机的,当消费者断开与队列的连接时,队列自动删除,减少我们对队列的管理)
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机与队列
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Receive02接收到的消息:" + new String(message.getBody(),"UTF-8"));
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
    }

}

(4)新建一个名为Emit的类用于编写生产者的代码

package com.ken.fanout;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * 发消息
 */
public class Emit {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //从控制台读取要发送的信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
           /**
            * 用信道对消息进行发布(消息持久化)
            * 第一个参数:发送到哪个交换机
            * 第二个参数:路由的Key值是哪个,本次是队列名
            * 第三个参数:其他参数信息
            * 第四个参数:发送消息的消息体
            */
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
            System.out.println("生产者发送的消息:" + message);
        }

    }

}

(5)分别先运行Receive01、Receive02和Emit

(6)在Emit里输入消息,然后查看Receive01和Receive02接收消息的情况,若两个消费者都分别消费了一样的消息,证明我们成功实现了Fanout交换机

例:

Emit

Receive01

Receive02

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/726149.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter生命周期小结

Flutter 中的生命周期,包含以下几个阶段: createState ,在 StatefulWidget 中创建 State 的方法,当 StatefulWidget 调用时会触发 createState 。initState ,在 State 初始化时调用,因此可以在此期间执行 …

TortoiseGit 如何回退到以前的版本?

要在 TortoiseGit 中回退到以前的版本,可以按照以下步骤进行操作: 在资源管理器中,右键单击你的 Git 仓库文件夹,然后选择 "TortoiseGit",再选择 "Show log"。这将打开 TortoiseGit 的日志界面。…

前端开发需要了解的工具集合

前端开发需要了解的一些工具,这些工具能够帮助你在项目开发中事半功倍。 1. nrm: npm registry 管理器 registry: npm 远程仓库的地址。 由于众所周知的原因,npm 官方仓库在国内特别的慢,所以我们需要用一些替代性方案,一种方案…

vue指令简介

什么是指令? 这些是特殊的说明,它们会在附加到 HTML 元素时更改其行为。 换句话说,这些是附加到 HTML 元素的特殊属性,这些属性可以更改行为并基于 DOM 的表达式值提供对 DOM 的控制。 所有 Vue 指令均以v-为前缀。 该前缀用于以…

一文读懂FPC(15)- FPC的挠曲性

FPC系列文章目录 1.什么是FPC 2.什么是R-FPC 3,FPC的基材 4.FPC基材压延铜和电解铜的区别 5,FPC的辅材 6,FPC常见的四种类型 7,FPC的生产流程简介 8,R-FPC的生产流程简介 9,FPC的发展及应用 10&a…

一、枚举类型——新特性(模式匹配-守卫)

守卫(guard)使你可以进一步细化匹配条件,而不只是简单地匹配类型。它是出现在类型判断和 && 后的一项测试。守卫可以是任何布尔表达式。如果选择器表达式和 case 的类型相同,并且守卫判断为 true,那么模式就匹…

RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件 RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html 2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是…

分布式锁:RedLock

https://mp.weixin.qq.com/s/8XHvt8vw2pai-QIujM2oxQ 为什么利用 setnx 实现分布式锁只能使用于单Redis实例,不支持Redis集群? 参考: https://blog.csdn.net/weixin_45525272/article/details/126562119

17 回归法做图像变化检测——建立图的回归等式(matlab程序)

1.简述 回归是确定两种或两种以上的变量间相互依赖的定量关系的方法。映射到本文就是用我们图像数据去预测该图像上衣服的价格。说直白点,就是通过X与Y确立函数关系式,只不过X换成了图片罢了。 2.代码 %% 建立图的回归等式 image1imread(11.TIF); image…

单词拆分00

题目链接 单词拆分 题目描述 注意点 s 和 wordDict[i] 仅由小写英文字母组成wordDict 中的所有字符串 互不相同不要求字典中出现的单词全部都使用字典中的单词可以重复使用 解答思路 最初想到用深度右边遍历来做,实现了功能但是由于做了很多重复判断时间复杂度…

OpenCV读取两张图像将下半部分(从中间行开始)的所有像素值设置为0黑色

#include <iostream> #include <opencv2/imgcodecs.hpp> #include <opencv2/opencv.hpp> #include

AI智能语音机器人的功能和作用都有哪些?

智能语音机器人是一种能够使用自然语言处理技术和人工智能算法&#xff0c;通过声音与用户进行交互的机器人。它可以回答用户提出的问题、处理用户的投诉、提供产品或服务的相关信息等等。 实现一个智能语音机器人需要涉及多个技术领域&#xff0c;包括自然语言处理、语音识别…

C#,ALGLIB(01)——支持C#开发、跨平台的、优秀的、开源的数值分析和数据处理库ALGLIB简介

1 关于ALGLIB ALGLIB是一个跨平台的数值分析和数据处理库。 它支持五种编程语言&#xff08;C&#xff0c;C#&#xff0c;Java&#xff0c;Python&#xff0c;Delphi&#xff09;和几种操作系统&#xff08;Windows和POSIX&#xff0c;包括Linux&#xff09;。 ALGLIB的功能包…

十一、Docker网络(Docker network)

学习参考&#xff1a;尚硅谷Docker实战教程、Docker官网、其他优秀博客(参考过的在文章最后列出) 目录 一、什么是docker network1.1 介绍1.2 docker 启动后的网络情况1.3 能干什么&#xff1f; 二、Docker网络相关命令2.1 查看网络2.2 查看网络源数据2.3 创建网络2.4 删除网络…

【RuoYi-Cloud-Plus】学习笔记 08 - Sentinel(三)流量控制知识整理

文章目录 前言参考目录版本说明学习笔记1、概述2、基于调用关系的流量控制&#xff08;流控模式&#xff09;2.1、流量规则 FlowRule2.2、选择节点3、基于QPS/并发数的流量控制&#xff08;流控效果&#xff09;3.1、默认方式&#xff08;直接拒绝&#xff09;3.2、冷启动 Warm…

数通王国历险记之数据从发出到接收的细节介绍{封装与解封装}

系列文章目录 数通王国历险记&#xff08;5&#xff09; 目录 前言 一&#xff0c;数据封装的全过程 1.1&#xff0c;应用层的封装形式 1.2&#xff0c;传输层的封装形式 理解&#xff1a; 1.3&#xff0c;网络层的封装形式 理解&#xff1a; 1.4&#xff0c;数据链路层…

Sublime Text,灵感犹如星辰,点亮创作之路

目录 引言Sublime Text的优点Sublime Text的缺点总结 Sublime Text 官方网站 引言 在这个快速发展的数字时代&#xff0c;创作者们面临着越来越多的选择&#xff0c;以提高他们的生产力和工作效率。而在众多的编辑软件中&#xff0c; Sublime Text 独树一帜&#xff0c;被誉为创…

mac怎么把m4a转换成mp3?

mac怎么把m4a转换成mp3&#xff1f;大家都知道m4a是苹果公司专属的音频文件格式&#xff0c;因此它是可以直接在mac电脑上打开播放的&#xff0c;但这并不代表m4a音频文件可以在其他播放器或者播放设备上直接打开和使用&#xff0c;相信这个问题大家都遇到过&#xff0c;造成这…

【Vivado那些事儿】动态时钟的使用

时钟是每个 FPGA 设计的核心。如果我们正确地设计时钟架构、没有 CDC 问题并正确进行约束设计&#xff0c;就可以减少与工具斗争的时间。 但对于某些应用&#xff0c;我们希望能够更改某些IP中的时钟频率。其中一个例子是在图像处理管道中&#xff0c;输出分辨率可以动态变化&a…