RabbitMq深度学习

news2025/1/21 15:46:09

什么是RabbitMq?

RabbitMQ是一个开源的消息队列中间件,它实现了高级消息队列协议(AMQP)。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息,使不同的应用程序能够相互之间进行通信。它支持多种编程语言和平台,并且具有灵活的路由和队列配置选项。

同步调用 

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高

  • 性能和吞吐能力下降

  • 有额外的资源消耗

  • 有级联失败问题

异步调用

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理

  • 需要依赖于Broker的可靠、安全、性能

MQ的种类 

 RabbitMq安装和使用 

 云服务器安装Rabbitmq。

 在docker 中拉去Ribbitmq镜像。

在docker 中运行ribbitmq。

docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq

 查看rabbitmq的状态。

rabbitmqctl status

接着我们还可以将Rabbitmq的管理面板开启,这样就可以在浏览器上进行实时访问和监控了。 

我们需要先进入rabbitmq容器。

docker exec -it [在docker中对应的ID] [进入容器的路径] #路径一般为/bin/bash

开启rabbitmq的控制面板设置。

rabbitmq-plugins enable rabbitmq_management

打开rabbitmq的控制面板,就是对应的控制面板端口为15672。

账号和密码都是:guest

 消息队列模型

 SpringAMQP

 什么是springAMQP?

Spring AMQP 是一个基于 Spring 框架的 AMQP(高级消息队列协议)的开发框架。它提供了一种简化和抽象化的方式来使用 AMQP,使得在应用程序中使用消息队列变得更加容易。

springAMQP的使用

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写发送者

编写applcation.yml文件

spring:
  rabbitmq:
    host: 119.9.212.171 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: guest # 用户名
    password: guest # 密码

进行测试

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringRunner.class) #如果不加此注解,spring容器无法自动注入RabbitTemplate
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void tess1() {
        String queueName = "queueName";
        String message = "hello, tolen";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

测试结果为下:

 可能会出现没有队列生成的情况,这是因为@Test无法自动一个 queue,我们手动创建一个即可。

编写消费者

编辑application.yml文件

spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: test # 用户名
    password: 123456 # 密码

创建消息监听者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqListener {
    @RabbitListener(queues = "queueName")
    public void getMessage(String message) {
        System.out.println("获取的消息是:" + message);
    }
}

直接配置即可,在后续的项目中消费者会监听对应的消息进行操作。

WorkQueue

我们可以对一个消息标签设置多个监听者,并且默认的设置是预取,也就是即使服务模块处理能力差的情况也会分配到相同个数的信息,不能达到能者多劳的效果,为了到达此效果,我们可以在application.yml中进行设置。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

发布与订阅

FanoutExchange的使用

在消费者模块编写:新建交换机,新建队列,交换机和队列绑定操作。

在配置类中完成上述操作

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfiguration {
    //声明交换机FanoutExchange
    @Bean
    public FanoutExchange fanoutExchange() {
//        设置交换机的名字
        return new FanoutExchange("tolen.fanout");
    }
//    创建一个信息队列1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
//    创建信息队列2
    @Bean
    public Queue fanoutQueue2() {
         return new Queue("fanout.queue2");
    }
    //将交换机和队列1进行绑定
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        //绑定队列给对应的交换机
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //将交换机和队列2进行绑定
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

在消费者模块中创建两个队列的监听器

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqListener {
    @RabbitListener(queues = "fanout.queue1")
    public void getMessage1(String message) {
        System.out.println("消息队列1中获取的消息是:" + message);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void getMessage2(String message) {
        System.out.println("消息队列2中获取的消息是:" + message);
    }

}

接下来不信消息发送模块,这里需要注意的是,此时我们是向对应的交换机发送消息,通过交换机发送消息给两个消息队列。

发送消息的代码为下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void tess1() {
        String queueName = "queueName";
        String message = "hello, tolen";
        rabbitTemplate.convertAndSend(queueName, message);
    }
    @Test
    public void fanoutTest() {
        String exchangeName = "tolen.fanout";
        String message = "hi, tolen!";
        //routingKey不进行设置
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
}

如果不设置routingKey的话,就会默认将消息发送到使用绑定的消息队列上。 

测试结果为下:

交换机状态

监听器接收到的消息 

 DirectExchange

可以设置routingKey,交换机可以向指定的队列发送消息。

配置监听器

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqListener {
    //使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue1"),
            exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchange
            key = {"red", "blue"}
    ))
    public void directQueue1(String message) {
        System.out.println("directQueue2:" + message);
    }
    //使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue2"),
            exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchange
            key = {"red"}
    ))
    public void directQueue2(String message) {
        System.out.println("directQueue2:" + message);
    }
}

编写消息发布模块

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void fanoutTest() {
        String exchangeName = "direct";
        String message = "hi, tolen!";
        //设置routingKey
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
}

测试结果为下:

此时就只有routingKey=blue的监听器才会接收到消息。

TopicExchage

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

修改编写监听器的配置

//使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue2"),
            exchange = @Exchange(name = "direct", type = ExchangeTypes.TOPIC), //默认使用的交换机类型就是directExchange
            key = {"#.new"}
    ))
    public void directQueue2(String message) {
        System.out.println("directQueue2:" + message);
    }

只要发送的消息中的routingKey中尾部为新闻的消息全部会被监听。(routingKey使用"."作间隔)

消息转换器

在springboot中默认使用JDK的序列化,为了提高使用性,我们可以使用json转换器。

在消费者和发送者中都导入对应的依赖。

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

在configuration中配置信息转换器。(消费者和发布者都需要配置)

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfiguration {

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

进行测试,在发送一个对象类型的消息。

对应的监听器

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Objects;

@Component
public class RabbitMqListener {
    //使用注解进行绑定, 不再需要configuration配置
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "directQueue2"),
            exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchange
            key = {"blue"}
    ))
    public void directQueue2(Map<String, String> message) {
        System.out.println("directQueue2:" + message);
    }
}

对应的发送代码

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.LinkedHashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void fanoutTest() {
        String exchangeName = "direct";
        Map<String, String> message = new LinkedHashMap<>();
        message.put("name", "tolen");
        message.put("age", "19");
        //设置routingKey
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
}

测试效果为下:

接收到的数据 。

 消息队列中的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/943124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

成都优优聚:美团代运营趋势在哪儿?

美团代运营作为一种经营模式&#xff0c;在当今日益竞争激烈的市场中扮演着重要的角色。它为商家提供了全方位的服务&#xff0c;从市场推广到订单管理&#xff0c;从团购券的制作到售后服务&#xff0c;帮助商家解决了运营中的各种难题。 首先&#xff0c;美团代运营的核心优势…

重磅OpenAI发布ChatGPT企业版本

8月29日凌晨&#xff0c;Open AI官网发布ChatGPT企业版本&#xff01; 企业版简介&#xff1a; ChatGPT企业版提供企业级安全和隐私、无限的高速 GPT-4 访问、用于处理更长输入的更长上下文窗口、高级数据分析功能、自定义选项等等。人工智能可以协助和提升我们工作生活的各个…

DEA创建maven项目,项目结构无src目录和pom.xml文件内容

File > Settings > Build,Execution,Deployment > Build Tools > Maven > Runnner,界面中VM Options 输入框中填入-DarchetypeCataloginternal&#xff0c;重启IDEA&#xff0c;打开即会有src目录和pom.xml文件。 -DarchetypeCataloginternal -DarchetypeCatal…

【校招VIP】产品思维设计之用户需求分析

考点介绍&#xff1a; 理解用户需求时需要我们在看待产品的时候不能以我们创造者的专业身份来看&#xff0c;而需要用同理心&#xff0c;将自己转变为一个产品的典型用户&#xff0c;才能准确挖掘到用户心底最真实的诉求。 『产品思维设计之用户需求分析』相关题目及解析内容可…

新版白话空间统计(26)标准距离

前文再续&#xff0c;书接上一回。 上次我们讲了方向分布工具&#xff0c;这个工具会生成一个标准差椭圆&#xff0c;其中有这样的一句话描述&#xff1a; “短半轴表示数据分布的范围&#xff0c;短半轴越短&#xff0c;表示数据呈现的向心力越明显&#xff1b;反之&#xf…

探索OLED透明屏的优缺点:引领科技未来的革命性突破

OLED透明屏作为一项革命性的创新技术&#xff0c;其令人惊叹的透明度和柔性性能引起了全球范围内的关注。 然而&#xff0c;了解OLED透明屏的优缺点对于我们全面认识其在科技未来中的地位至关重要。 今天&#xff0c;尼伽将深入探讨OLED透明屏的优势和限制&#xff0c;并借助…

耐世特Nexteer EDI解决方案

耐世特Nexteer曾经为美国通用汽车全资子公司&#xff0c;是一家集研发、制造、销售于一体的全球化集团公司。耐世特汽车系统公司是转向系统及相关先进技术的全球供应商。该公司为60多家汽车制造商设计、制造、销售电动助力转向器、液压助力转向器、转向管柱和传动轴产品&#x…

Nacos基础(3)——nacos+nginx 集群的配置和启动 端口开放 nginx反向代理nacos集群

目录 引出nacos集群nginx反向代理nacos集群停止单例nacos准备8848和8858修改cluster.conf配置【配置】修改启动配置文件【配置】开放8858的端口分别以集群方式启动【启动】前端访问查看生产者测试8858nacos nginx反向代理配置代理tcp代理http启动nginx反向代理容器生产者访问测…

C++内存分区模式

内存分区模型 代码区&#xff1a;存放函数体的二进制代码&#xff0c;由操作系统管理的全局区&#xff1a;存放全局变量和静态变量以及常量栈区&#xff1a;由编辑器自动分配释放&#xff0c;存放函数的参数值&#xff0c;局部变量等堆区&#xff1a;由程序员分配和释放&#…

【操作记录】pytorch_geometric安装方法

pytorch_geometric安装方法 github地址 主要不要直接pip install安装&#xff0c;会由于依赖无法安装而失败 点击here手动安装依赖 选择对应的pytorch版本&#xff0c;我的是Win10 Python3.8.3Pytorch1.8.1CUDA10.2 手动下载四个依赖包本地安装&#xff1a; 主要不要直接&am…

高效高精分板的关键驱动装置:PCB分板机切割主轴

随着电子制造业的快速发展&#xff0c;分板机成为了一种必不可少的生产设备&#xff0c;PCB分板机切割主轴作为关键驱动装置&#xff0c;扮演着重要的角色。 PCB分板机切割主轴由电机、轴承、传动系统和控制系统等组成。电机产生的动力通过传动系统传递给主轴&#xff0c;主轴…

项目经理常用工具01

主要工具 表达工具—SCRTV方法 情境 Scene&#xff1a;明确问题&#xff1a;是什么&#xff1f;冲突 Confilict&#xff1a;提出疑问&#xff1a;怎么了?原因 Reason&#xff1a;分析原因&#xff1a;为什么&#xff1f;策略 Tactics&#xff1a;进行决策&#xff1a;怎么办…

运用Python解析HTML页面获取资料

在网络爬虫的应用中&#xff0c;我们经常需要从HTML页面中提取图片、音频和文字资源。本文将介绍如何使用Python的requests库和BeautifulSoup解析HTML页面&#xff0c;获取这些资源。 一、环境准备 首先&#xff0c;确保您已经安装了Python环境。接下来&#xff0c;我们需要安…

李宏毅 2022机器学习 HW2 上分路线

baseline增加concat_nframes &#xff08;提升明显&#xff09;增加batchnormalization 和 dropout增加hidden layer宽度至512 &#xff08;提升明显&#xff09; 提交文件命名规则为 prediction_{concat_nframes}[{n_hidden_layers}{dropout}_bn].csv

【Ubuntu】Ubuntu常用软件部署

1.安装jdk1.8 (1).apt方式安装 1).安装 1.在终端中输入以下命令&#xff0c;以更新软件包列表 sudo apt-get update2.在终端中输入以下命令&#xff0c;以安装JDK 1.8 sudo apt-get install openjdk-8-jdk3.将Java 1.8设置为默认版本。在终端中输入以下命令 sudo update-…

忘记密码-小米机型 其他安卓机型账号锁 设备锁的分析与刷写某第三方解锁包输入“注册码”

重要提示&#xff1a; 博文的主要目的是分析安卓机型账号锁的安全性和解决方法。操作仅限于自己的机型忘记密码 手机号不用过了保修期导致无法通过官方解锁的操作&#xff0c;请勿用于非法途径 在开始前。对于锁的认知可以参考这篇博文 安卓搞机玩机-什么是“锁 ” BL锁 屏幕锁…

基于ssm的大型商场会员管理系统源码和论文

基于ssm的大型商场会员管理系统源码和论文084 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 摘 要 进入信息时代以来&#xff0c;很多数据都需要配套软件协助处理&#xff0c;这样可以解决传统方式带来的管…

JavaWeb之一直摆,一直赶

注解&#xff1a; 注解如果设置了参数的话最后设置默认值&#xff0c;不然容易报错&#xff0c;而且在设置默认值的时候&#xff1a; 自定义注解&#xff1a; 元注解: 对其他注解做出注解 常用元注解&#xff1a; Target:用于描述注解的使用范围&#xff1a; //比如这样一个…

checkstyle检查Java编程样式:识别应该被定义为final的类

介绍 总体说明 checkstyle可以使用FinalClass检查应该被定为final的类。如果违反了&#xff0c;就会报违反项&#xff1a; https://checkstyle.sourceforge.io/checks/design/finalclass.html checkstyle规则集文件对FinalClass模块的配置&#xff1a; 哪些类可以被定义fi…

Linux系统运维指南

实验linux操作系统版本为&#xff1a;CentOS-7.6-x86_64-DVD-1810.iso 注意&#xff1a;此文档为讨论性材料&#xff0c;均为个人实验截图及网络收集资源&#xff0c;非终版。 建议安装操作系统的磁盘与存放数据的磁盘分开 系统盘本次配置&#xff1a;50G 生产推荐&…