RabbitMQ: Publish/Subscribe结构

news2025/1/13 15:46:32

生产者 

package com.qf.mq2302.publishSub;

import com.qf.mq2302.utils.MQUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        Connection conn = MQUtils.getConnection();
        Channel channel = conn.createChannel();

        // 在mq中声明一个交换机
        /**
         * 第一个参数:交换机的名字
         * 第二个参数:交换机的类型,fanout代表该交换机会把收到的消息无差别投递给所有他关联的队列
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String msg = "hello fanout!";

        /**
         * 第一个参数,交换机的名字
         * 第二个参数:如果交换机是 fanout类型的,可以写空串 ;因为fanout类型的交换机会把消息无差别向关联队列投递
         */
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("utf-8"));


        channel.close();
        conn.close();
    }
}

消费者1

package com.qf.mq2302.publishSub;

import com.qf.mq2302.utils.MQUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class ReceiveLogs01 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        Connection conn = MQUtils.getConnection();
        Channel channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字
        String queueName = channel.queueDeclare().getQueue();
        // 把队列和交换机建立好绑定关系
        /**
         * 参数1: 队列名
         * 参数2: 交换机名
         * 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        channel.basicQos(1);
        channel.basicConsume(queueName, false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");
                System.out.println("01:"+msg);

                // 手动ack
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        },c -> {});

    }
}

消费者2

package com.qf.mq2302.publishSub;

import com.qf.mq2302.utils.MQUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class ReceiveLogs02 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        Connection conn = MQUtils.getConnection();
        Channel channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字
        String queueName = channel.queueDeclare().getQueue();
        // 把队列和交换机建立好绑定关系
        /**
         * 参数1: 队列名
         * 参数2: 交换机名
         * 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        channel.basicQos(1);
        channel.basicConsume(queueName, false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");
                System.out.println("02:"+msg);

                // 手动ack
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        },c -> {});

    }
}

消费者3

package com.qf.mq2302.publishSub;

import com.qf.mq2302.utils.MQUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;

public class ReceiveLogs03 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        Connection conn = MQUtils.getConnection();
        Channel channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字
        String queueName = channel.queueDeclare().getQueue();
        // 把队列和交换机建立好绑定关系
        /**
         * 参数1: 队列名
         * 参数2: 交换机名
         * 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        channel.basicQos(1);
        channel.basicConsume(queueName, false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                byte[] body = message.getBody();
                String msg = new String(body, "utf-8");
                System.out.println("03:"+msg);

                // 手动ack
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        },c -> {});

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/983923.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

word转PDF文件变小,图片模糊

word论文29M,文件——另存为——只有1.5M左右,图片压缩严重,图片看不清。 word中很多大图,5M一张的图,所以word很大。 找了很多方法,转换后都在2M左右,勉强可以。 直到找到了这个&#xff0c…

机器学习基础之《分类算法(6)—决策树》

一、决策树 1、认识决策树 决策树思想的来源非常朴素,程序设计中的条件分支结构就是if-else结构,最早的决策树就是利用这类结构分割数据的一种分类学习方法 2、一个对话的例子 想一想这个女生为什么把年龄放在最上面判断!!&…

PC调试手机微信浏览器

准备工具 一部手机 一台电脑 一条数据线 首先用数据线把手机与电脑连接 然后手机进入到开发者模式并且要是开启状态,具体进入方式,根据机型不同,进入方式不同,自行百度。 进入到开发者选项之后,开启USB调试模式。 然…

2023年最全ins商店开通运营攻略

借助 Instagram 商店,品牌可以策划一系列可购物的商品,这些商品可通过其 Instagram 个人资料直接访问。这使得在应用程序上销售更容易,也被潜在客户发现。 一、什么是Instagram Shop? Instagram 商店为商家提供了一种在 Instagra…

【Liunx】进程概念,查看进程,进程调用,创建子进程

进程 1.什么是进程2.查看进程3.常见进程调用4.创建子进程 1.什么是进程 以前我们在书上或者其他途径了解到进程的概念。 一个运行起来(加载到内存)的的程序叫做进程。 在内存中的程序叫做进程。 进程与程序相比具有动态属性。 这里的概念比较抽象&#…

Task :app:compileDebugKotlin FAILED

gradle.properties 里面加上 android.enableJetifiertrue

PyCharm下载安装

PyCharm下载链接 点击下载PyCharm Community Edition社区版(PyCharm Professional专业版需要收费,但可以免费试用 30 天,也可以找到激活方式;而社区版是完全免费的,初学者学习 Python建议使用社区版,不会有…

Android studio自定义输出编译apk的名称

//输出apk名称android.applicationVariants.all { variant ->variant.outputs.all {//com.android.app-debug-1.0.apkoutputFileName "${variant.applicationId}-${variant.name}-${variant.versionName}.apk"//debug-1.0.apkoutputFileName "${variant.na…

内网穿透的应用-不再依赖iCloud!利用群晖生态,自己掌控本地SSD的云存储!

文章目录 前言本教程解决的问题是:按照本教程方法操作后,达到的效果是想使用群晖生态软件,就必须要在服务端安装群晖系统,具体如何安装群晖虚拟机请参考: 1. 安装并配置synology drive1.1 安装群辉drive套件1.2 在局域…

2023-python-解释器是什么东西?

传送门 对比学习一下: 下面的是编译型,比如c等; 先compiler编译成二进制形式的目标文件(Object File),然后链接起来; 解释型python: 解释器的compiler对正在运行中的文件中的代码进行一个 词法…

一文彻底理解什么是同步和异步!

相信很多同学遇到同步异步这两个词的时候大脑瞬间就像红绿灯失灵的十字路口一样陷入一片懵逼的状态: 是的,这两个看上去很像实际上也很像的词汇给博主造成过很大的困扰,这两个词背后所代表的含义到底是什么呢? 我们先从工作场景…

C++项目实战——基于多设计模式下的同步异步日志系统-②-相关技术补充(不定参函数)

文章目录 专栏导读不定参函数C风格不定参函数不定参宏函数 专栏导读 🌸作者简介:花想云 ,在读本科生一枚,C/C领域新星创作者,新星计划导师,阿里云专家博主,CSDN内容合伙人…致力于 C/C、Linux 学…

【autodesk】浏览器中渲染rvt模型

使用Forge完成渲染 Forge是什么 为什么能够渲染出来rvt模型 Forge是由Autodesk开发的一套云端开发平台和工具集。在Forge平台中,有一个名为"Model Derivative"的服务,它可以将包括RVT(Revit)在内的多种BIM&#xff08…

【无线电力传输】12 V 直流风扇无线电力传输系统的实现(Simulink)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

华为OD机试 - 最长的指定瑕疵度的元音子串 - 正则表达式(Java 2023 B卷 200分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(A卷B卷&#…

高速电路设计笔记----第一章

一、需求。 1、电路设计首先要做的是明确需求。 2、明确需求后需要对CPU进行选型、电源的选型、退耦电容选型。 3、画原理图需要兼顾PCB上的器件布局。中间电阻电容如果是靠近下一级,在原理图中画的时候也应该是靠近下一级处画。 4、按照PCB上电容的排列顺序绘制…

Java复习-25-单例设计模式

单例设计模式 目的(使用场景) 在实际开发下,会存在一种情况:某一种类在程序的整个生命周期中,只需要实例化一次就足够了。例如,系统数据类,由于操作系统只有一个,因此在程序初始化…

【PMO项目管理】深入了解项目管理 | Stakeholder 利益相关者 | 利益相关者之间的立场差异

💭 写在前面:本文将带您深入了解项目管理的核心概念和关键要素。我们将从项目管理的基本理解开始,逐步探讨其领域、复杂性和变化的重点,以及项目管理的具体过程。我们还将研究项目的性质以及成功项目所必备的条件。在此过程中&…

PandaGPT部署演示

PandaGPT 是一种通用的指令跟踪模型,可以看到和听到。实验表明,PandaGPT 可以执行复杂的任务,例如生成详细的图像描述、编写受视频启发的故事以及回答有关音频的问题。更有趣的是,PandaGPT 可以同时接受多模态输入并自然地组合它们…

ClickHouse 存算分离改造:小红书自研云原生数据仓库实践

ClickHouse 作为业界性能最强大的 OLAP 系统,在小红书内部被广泛应用于广告、社区、直播和电商等多个业务领域。然而,原生 ClickHouse 的 MPP 架构在运维成本、弹性扩展和故障恢复方面存在较大局限性。为应对挑战,小红书数据流团队基于开源 C…