RabbitMQ、RabbitMQ发布/订阅模式

news2025/1/11 21:47:52

1.RabbiMQ

RabbitMQ是一个消息中间件
MQ的基本结构
在这里插入图片描述

1.1RabitMQ安装

参考:Docker安装
Docker中部署RabbitMQ

2.入门案例

2.1.publisher实现

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

2.2.consumer实现

代码实现:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

2.3.总结

基本消息队列的消息发送流程

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 利用channel向队列发送消息

基本消息队列的消息接收流程

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 定义consumer的消费行为handleDelivery()

  5. 利用channel将消费者与队列绑定

3.发布/订阅

发布订阅的模型如图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oJCfKz9C-1683447126021)(assets/image-20210717165309625.png)]

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.1用bean实例实现Fanout类型的发布/订阅模型

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.1.1在consumer中创建一个类,声明队列和交换机:

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("huang.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.1.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "huang.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3.1.3.消息接收

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

3.2.基于注解实现Direct类型的发布/订阅模式

3.1.1.consumer注解来声明队列和交换机

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

3.1.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/498939.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

链表中倒数第k个结点

描述 输入一个链表&#xff0c;输出该链表中倒数第k个结点。 示例1 输入&#xff1a; 1,{1,2,3,4,5} 复制返回值&#xff1a; {5}看代码 struct ListNode* FindKthToTail(struct ListNode* pListHead, int k ) {struct ListNode* fast pListHead, *slow pListHead;whi…

Redis实现分布式锁详细解读

文章目录 什么是分布式锁&#xff1f;如何用Redis实现分布式锁&#xff1f;分布式锁的改进锁过期处理集群环境下Redis宕机问题RedLock的引入RedLock的实现步骤RedLock带来的弊端 什么是分布式锁&#xff1f; 我们在学多线程的时候遇到过ReetrantLock&#xff0c;这种锁主要应用…

node install编译失败原因

关键报错信息 npm ERR! gyp verb check python checking for Python executable "python2.7" in the PATH npm ERR! gyp verb which failed Error: not found: python2.7 或者 npm ERR! code ERESOLVE npm ERR! ERESOLVE could not resolve npm ERR! npm ERR! Whi…

车载以太网时间同步之EthTsync

车载以太网时间同步之EthTsync 前言 首先&#xff0c;请问大家几个小小问题&#xff0c;你清楚&#xff1a; 你知道EthTsync模块的主要作用是什么吗&#xff1f;EthTsync模块与其他AUTOSAR基础软件模块交互关系&#xff1b;Eth Tsync模块使用的时间同步协议是什么&#xff1f…

Java—JDK8新特性—函数式接口

目录 函数式接口 3.1 什么是函数式接口 3.2 functionalinterface注解 源码分析 3.3 Lambda表达式和函数式接口关系 3.4 使用函数式接口 函数式接口 3.1 什么是函数式接口 如果一个接口中只包含一个抽象方法&#xff0c;这个接口称为函数式接口 如果一个接口包含&#xff0…

mac php8 安装xdebug模块失败

安装 xdebug 模块,官网有详细介绍Xdebug: Documentation Installation 本机是mac php使用brew安装,想着可以直接使用以下方式安装,还是美滋滋的 但是安装途中发生了错误 PHP Warning: mkdir(): File exists in /usr/local/Cellar/php/8.0.10/share/php/pear/System.php on…

解决报错ERROR: No matching distribution found for torchvision==0.11.2+cu111

目录 一、猜测 二、验证 三、解决方案 四、检验 该报错是在按官网方法用指令&#xff1a; pip install torch1.9.1cu111 torchvision0.10.1cu111 torchaudio0.9.1 -f https://download.pytorch.org/whl/torch_stable.html 安装pytorch时出现的&#xff0c;以下是分析&#…

粗糙集属性约简方法与Python实现【1】

1. 方法概述 1.1 定义 粗糙集是波兰理工大学Z.pawlak教授提出用来研究不完整数据,不精确知识的表达、学习,归纳等的一套理论。它是一种新的处理模糊和不确定性问题的数学工具,已被广泛应用于知识发现、机器学习、决策支持、模式识别、专家系统及归纳推理等领域。 粗糙集理…

开源相亲小程序

此项目目前已完成前台开发。源码结构清晰&#xff0c;完美实现模块化组件化思想&#xff0c;易维护。 曾经&#xff0c;作者也为寻求自己的另一半苦恼&#xff0c;因为平时工作繁忙&#xff0c;交际圈窄小&#xff0c;而父母又各种催婚&#xff0c;无奈上了“XX网”去碰碰运气。…

webpack5搭建react框架-antd组件库使用

antd组件库使用 一、前言 前面已经完成了webpack5 react框架的配置搭建&#xff0c;我们在进行项目开发的时候大多还会使用第三方的组件库&#xff0c;而antd组件库在react项目中使用是非常非常多的&#xff0c;所以就将react框架使用最多的antd组件库引入并使用。 二、ant…

京东给了兄弟姐妹们稳稳的幸福

“今天我看了宿舍楼&#xff0c;我真的是气得想打人&#xff1b;我原来一直说的是高级单身公寓&#xff01;可实际情况呢&#xff1f;我说了多少遍了&#xff0c;要让员工、让兄弟们活的有尊严。而我们宿迁分公司的管理层是怎么做的呢&#xff0c;说难听的就是没有把员工当人去…

低代码平台的多租户SAAS系统实战解决方案—JeecgBoot

JeecgBoot免费低代码平台&#xff0c;提供一键切换多租户模式机制&#xff01;快速实现全系统的saas租户方案&#xff0c;通过租户ID进行数据隔离。 租户设计思路 1、开启全系统租户隔离 开启方法 将 org.jeecg.config.mybatis.MybatisPlusSaasConfig#OPEN_SYSTEM_TENANT_CO…

RabbitMQ --- 消息可靠性

消息队列在使用过程中&#xff0c;面临着很多实际问题需要思考&#xff1a; 一、消息可靠性 消息从发送&#xff0c;到消费者接收&#xff0c;会经理多个过程&#xff1a; 其中的每一步都可能导致消息丢失&#xff0c;常见的丢失原因包括&#xff1a; 发送时丢失&#xff1a; …

(java)继承和多态 (详解)

目录 1 继承 1.1为什么需要继承 1.2 继承概念 1.3 继承的语法 1.4 父类成员访问 1.4.1 子类中访问父类的成员变量 1.4.2 子类中访问父类的成员方法 1.5 super关键字 1.6 子类构造方法 1.7 super和this 1.7.1 this 1.7.2 super和this 1.8 再谈初始化 1.9 继承方…

软考信管高级——人力资源管理

人力资源管理内容 人力资源管理计划 内容&#xff1a; 角色与职责&#xff1a;定义项目所需的岗位、技能和能力项目组织图&#xff0c;说明项目所需的人员数量人员配备管理计划&#xff0c;说明需要每个团队的时间段以及有助于项目团队参与的其他重要信息 成功的项目团队的特…

Linux安装java jdk

1、检查系统中jdk 版本&#xff1a;java -version 2、检测 jdk 安装包&#xff1a;rpm -qa | grep java 3、卸载 openjdk rpm -e --nodeps tzdata-java-2017b-1.el7.noarch rpm -e --nodeps java-1.8.0-openjdk-1.8.0.131-11.b12.el7.x86_64 rpm -e --nodeps java-1.8.0-open…

深度学习细节总结

计算机视觉 目标检测&#xff0c;语义分割&#xff0c;目标分类 自然语言处理NLP 数据结构 数据结构 访问元素 线性回归 可以看成是一个单层的神经网络&#xff0c;有显式的解 优化算法 梯度下降&#xff0c;超参数&#xff1a;学习率、批量大小 分类回归 单层感知机…

RuntimeError:cuDNN error:CUDNN_STATUS_EXECUTION_FAILED

背景 最近在服务器上跑Deeplabv3进行语义分割时&#xff0c;需要使用GPU版的pytorch。 我在Anaconda下配置了适配服务器CUDA的pytorch&#xff0c;但是报错如下&#xff0c;&#xff08;下图无限接近于我的错误&#xff0c;但是我忘记截图我的报错了&#xff0c;所以用了下面这…

魔百盒CM211-1S_ZG_增强版2+16_当贝纯净版桌面-卡刷固件包

魔百盒CM211-1S_ZG_增强版216_当贝纯净版桌面-卡刷固件包-内有教程-华为鸿蒙动画 特点&#xff1a; 1、适用于对应型号的电视盒子刷机&#xff1b; 2、开放原厂固件屏蔽的市场安装和u盘安装apk&#xff1b; 3、修改dns&#xff0c;三网通用&#xff1b; 4、大量精简内置的…

图灵java学习

反汇编 最后一行 效果是 程序计数器&#xff0c;保存下一个指令的地址 iadd. int加法 动态链接 Java动态链接&#xff08;Dynamic Linking&#xff09;是Java中的一种运行时特性&#xff0c;它允许在应用程序运行时动态地链接和加载库和组件。在编译时&#xff0c;Java程序不需…