【消息队列】RabbitMQ五种消息模式

news2024/11/18 11:17:11

RabbitMQ

  • RabbitMQ
    • RabbitMQ安装
  • 常见的消息模型
    • 基本消息队列
    • SpringAMQP
    • WorkQueue
    • 消息预取
    • 发布订阅模式
      • Fanout Exchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:https://www.rabbitmq.com/

RabbitMQ安装

我们在Centos虚拟机中使用Docker来安装

  1. 下载镜像,在线拉取
    docker pull rabbitmq
  2. 安装MQ
docker run\
--env RABBITMQ_DEFAULT_USER=itcast \  # 设置环境变量用户名
--env RABBITMQ_DEFAULT_PASS= \  # 设置环境变量密码
--name mq \   # 队列名称
--hostname mq1 \  #配置主机名
-p 15672:15672 \  # MQ管理端口
-p 5672:5672 \   #MQ消息传输端口
-d \   # 后台运行
rabbitmq

在这里插入图片描述
在这里插入图片描述

交换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的

在RabbitMQ中:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组

常见的消息模型

  1. 基本消息队列
  2. 工作消息队列

这两种并没有用到交换机,而是直接到达队列

  1. 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种:
    Fanout Exchange:广播
    Direct Exchange:路由
    Topic Exchange:主题

基本消息队列

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息

java模型(消息发布者)

@Test
public void test() throws IOException,TimeoutException{
    //1.建立连接,与消息队列进行连接
    ConnetionFactory factory =new ConnetionFactory();
    //设置连接参数,主机名,端口号,vhost,用户名,密码
    factory.setHost(192.168.75.136);
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("");
    //建立连接
    Connection connection =factory.newConnection();
    //创建通道Channel,就可以向队列发送消息了
    Channel channel =connection.createChannel();
    //创建队列
    String queuename="hlh";
    channel.queueDeclare(queuename,false,false,false,null);
    //发送消息
    String message="hello";
    channel.basicPublish("",queuename,null,message.getBytes());
    //关闭通道和连接
    channel.close();
    connection.close();
}

java模型(消息消费者)

    //1.建立连接,与消息队列进行连接
    ConnetionFactory factory =new ConnetionFactory();
    //设置连接参数,主机名,端口号,vhost,用户名,密码
    factory.setHost(192.168.75.136);
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("");
    //建立连接
    Connection connection =factory.newConnection();
    //创建通道Channel,就可以向队列发送消息了
    Channel channel =connection.createChannel();
    //创建队列
    String queuename="hlh";
    channel.queueDeclare(queuename,false,false,false,null);
    //订阅消息
    channel.basicConsume(queuename,true,new DefaultConsumer(channel){
    @Override
    //处理消息的代码,绑定函数,有了消息才执行
    public void handleDelivery(String consumerTag,Envelope envelope,
                               AMQP.BasicProperties properties,byte[] body)throws IOException{
                  //处理消息
                  String message=new String(body);             
               }
    })

注意:上边生产者消费者都创建了队列:

这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先建立消费者,还是先建立生产者,所以都执行创建函数,但是创建的队列只有一个不会重复

SpringAMQP

  • AMQP

是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中的独立性的要求

  • Spring AMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现

  • 特征:
  1. 监听器容器,用于异步处理入站消息
  2. 用于发送和接收消息的RabbitTemplate
  3. Rabbitadmin用于自动声明队列,交换和绑定
  • 使用:
  1. 引入spring-amqp的依赖
    在这里插入图片描述
    在yml中配置mq连接信息:
spring: 
   rabbitmq:
     host: 192.168.75.136 #主机名
     port: 5672 #端口
     virtual-host: / #虚拟主机
     username: itcast #用户名
     password:   #密码
  1. 在生产者服务中利用RabbitTemplate发送消息到hlh.queue这个队列
public class springamqptest{
   @Autowired
   private RabbitTemplate rabbittemplate;
   @Test
   public void test(){
     String queuename="hlh.queue";
     String message="hello";
     rabbittemplate.convertAndSend(queuename,message);
   }
}
  1. 在消费者服务端编写消费逻辑,绑定到hlh.queue这个队列中
@Component
public class SpringrabbitListener {
   @RabbitListener(queues="hlh.queue")
   public void listenSimple(String msg) throws InterruptedException{
     //消费逻辑代码
   }
}

注意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能

WorkQueue

Work queue,工作队列。可以提高消息处理速度,避免队列消息堆积

一个消息队列绑定多个消费者

假设现在生产者每秒循环发送50条消息,此时的消费者怎么处理:

@Component
public class SpringrabbitListener {
   @RabbitListener(queues="hlh.queue")
   public void listenSimple(String msg) throws InterruptedException{
     //消费逻辑代码
   }
      @RabbitListener(queues="hlh.queue")
   public void listenSimple2(String msg) throws InterruptedException{
     //消费逻辑代码
   }
}

通过定义多个消费者进行消费,追上生产者生产的速度,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除

消息预取

指的每个消费者每次取多少条消息:
可以通过配置进行配置:

spring:
   rabbitmq:
      host: 192.168.75.136
      port: 5672
      virtual-host: /
      username: itcast
      password: 
      listener:
         simple:
           prefecth: 1 #每次只能获取一条消息,处理完才能获得下一个消息

发布订阅模式

发布订阅可以使得同一个消息发送给多个消费者,实现方式是加入了exchange(交换机)
在这里插入图片描述

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

交换机的作用:

  1. 接收生产者的消息,将消息按照规则路由到与之绑定的队列
  2. 不能缓存消息,路由失败,消息丢失
  3. FanoutExchange的会将消息路由到每个绑定的队列

SpringAMQP提过了声明交换机,队列,绑定关系的API:
在这里插入图片描述

Fanout Exchange

Fanout Exchange 会将所有的消息路由到每一个跟其绑定的queue
在创建配置类,在配置类中进行消息队列绑定交换机

@Configuration
public class FanoutConfig{
    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
       return new FanoutExchange("itcast.fanout");
    }
    //声明一个队列
    @Bean
    public Queue fanoutQueue1(){
       return new Queue("fanout.queue1");
    }
    // 绑定队列跟交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
       return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
}

此时的生产者如何发送消息:

public void test(){
  //给出交换机名称
  String exchangeName="itcast.fanout";
  String message="hello";
  //发送消息
  rabbitTemplate.convertAndSend(exchangeName,"",message);
}

监听者如何收到消息

@RabbitListener(queues="fanout.queue1")
public void listener(String msg){
   //处理得到的消息
}

DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

一个队列可以指定多个Key

我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))
public void Listener(String msg){
   //进行消息的处理
}

在生产者生产时:

public void test(){
  //给出交换机名称
  String exchangeName="itcast.fanout";
  String message="hello";
  //发送消息
  rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}

TopicExchange

TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,并且以.分隔
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
同样也是使用 @RabbitListener进行声明

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))
public void Listener(String msg){
   //进行消息的处理
}

生产者生产消息:

public void test(){
  //给出交换机名称
  String exchangeName="itcast.fanout";
  String message="hello";
  //发送消息
  rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);
}

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需定义一个MessageConverter 类型的Bean即可,推荐使用JSON方式完成序列化

  1. 引入jackson的依赖
    在这里插入图片描述
  2. 声明MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){
   return new Jackson2JsonMessageConverter();
}

这样发送的消息就会使用自定义的转换类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1637252.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python 反引号怎么打

切换到英文输入法模式下。 找到电脑键盘 Esc 下面的键。 敲两下就出现反引号了。

在一台交换机上配置VLAN

实验环境 实验拓扑图结构如图12.12所示,其中PC1和PC3属于VLAN 2,PC2属于 VLAN 3,PC1的IP地址为192.168.0.2/24,PC2的IP地址为192.168.1.2/24,PC3的 IP地址为192.168.0.3/24。 图12.12 需求描述 要求处于相同VLAN中的主…

正版Office-Word使用时却提示无网络连接请检查你的网络设置 然后重试

这是购买电脑时自带的已经安装好的word。看纸箱外壳有office标记,但是好像没有印系列号。 某天要使用。提示:无网络连接请检查你的网络设置。 经过网上高手的提示: 说要勾选勾选ssl3.0、TLS1.0、1.1、1.2。 我的截图 我电脑进去就缺1.2. …

【数据结构】串?在计算机中还有这样一种数据结构

串的基本概念与基本操作 导言一、串的定义及其重要术语1.1 串的定义1.2 串的重要术语1.3 ASCII码值1.4 转义字符 二、串的三要素2.1 串的逻辑结构2.2 串的存储结构2.3 串的运算 结语 导言 大家好,很高兴又和大家见面啦!!! 看到今…

手把手实现一个简约酷美美的版权声明模块

1. 导语 版权声明在很多网站都有用到,出场率还是很高的。所以今天就实现一个属于自己分风格的版权声明模块,技术上采用原生的前端三剑客: HTMLCSSJavaScript(可能会用到) 比如CSDN的版权声明是这样的 2. 需求分析 先看看成品吧,这篇文字结…

初始计算机网络

TCP/IP TCP/IP模型 TCP/IP网络模型:对于不同设备之间的通信,就需要网络通信,而设备是多样性的,所以要兼容多种多样的设备,就协商出了一套通用的网络协议。 TCP/IP分层 这个网络协议是分层的,每一层都有…

向gitee推送项目

步骤很详细,直接上教程 在gitee新建项目并复制链接 在当前项目目录打开git bash 输入以下指令进行初始化 git init配置个人信息 git config --global user.name 你的昵称 git config --global user.email 账号绑定的邮箱 5. 绑定远程仓库地址 git remote add ori…

机器人正反向运动学(FK和IK)

绕第一个顶点可以沿Z轴转动,角度用alpha表示 绕第二个点沿X轴转动,角度为Beta 第三个点沿X轴转动,记作gama 这三个点构成姿态(pose) 我们记第一个点为P0,画出它的本地坐标系,和世界坐标系一样红…

Java面试八股之Java中数组有没有length()方法?String呢?为什么?

Java中数组有没有length()方法?String呢?为什么? 数组: 数组没有名为length()的方法,但有一个属性叫做length。可以通过数组名直接访问这个属性来获取数组的长度(即元素个数)。这是一个整数值&…

3-2 STM32c8t6实现流水灯

实物接线如下: 软件代码 #include "stm32f10x.h" // Device header #include "delay.h" int main(void) {RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA,ENABLE); //开启时钟GPIO_InitTypeDef GPIO_InitStructure;GPIO_Init…

中国移动首批12个智算中心节点投产暨移动云智算产品体系正式发布

4月28日,中国移动在苏州隆重举办了2024中国移动算力网络大会。其中,以“智算凌云 慧联九州”为主题的一体化算力网络新基建论坛上,中国移动重磅发布了首批12个智算中心节点投产和智算产品体系。 中国移动响应国家号召,依托算力网络…

深度学习之基于YOLOv5智慧交通拥挤预警检测系统

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 随着城市化进程的加速和人口规模的不断增长,交通拥挤问题日益严重。传统的交通拥挤预警方…

goget配置多个golang 运行环境

一台主机安装多个golang 运行环境 本环境 windows10 为 基础 mac linux也可以按照此方法操作 背景 开发不同的运维工具会用到不同版本的golang,但是开发者不能一直进行重装来处理 ,因此 需要一个工具进行golang版本的管理 go管理工具介绍 gvm (Go V…

【报错处理】ib_write_bw执行遇到Couldn‘t listen to port 18515原因与解决办法?

要点 要点: ib默认使用18515端口 相关命令: netstat -tuln | grep 18515 ib_write_bw --help |grep port# server ib_write_bw --ib-devmlx5_1 --port88990 # client ib_write_bw --ib-devmlx5_0 1.1.1.1 --port88990现象: 根因&#xff…

蓝桥杯如何准备国赛?

目录 一、赛前准备 1、如何刷题,刷哪些题? 2、记录(主要看个人习惯) CSDN博客 写注释 3、暴力骗分 4、从出题人的角度出发,应该如何骗分 二、赛中注意事项 一、赛前准备 1、如何刷题,刷哪些题&…

24.什么是跨域?解决方案有哪些?

为什么会出现跨域问题 存在浏览器同源策略,所以才会有跨域问题。那么浏览器是出于何种原因会有跨域的限制呢。其实不难想到,跨域限制主要的目的就是为了用户的上网安全。 同源策略导致的跨域是浏览器单方面拒绝响应数据,服务器端是处理完毕…

OPC UA与IEC61499 在分布式智能电网中的应用

储能系统的系统架构 CMC :Cell Management Controller 储能设备中的电池芯包与电池均衡系统构成电池模组,国内的电池芯包通常使用被动均衡技术,被动均衡芯片通常通过SPI 接口连接到CMC 控制器,CMC 以单片机为主构建,具…

C语言——单链表实现数据增删查改

一.前言 嗨嗨嗨,我们又见面了。前面我们已经学习了关于数据结构中的顺序表,今天我们来学习数据结构中的单链表。废话不多说让我们直接开始吧。 二.正文 1.1链表的概念 链表是一种物理存储结构上非连续、非顺序的存储结构,数据元素的逻辑顺…

“云卷数潮”云原生数据库分论坛亮点回顾!

4月29日,2024中国移动算力网络大会“云卷数潮”云原生数据库分论坛在江苏苏州举行。本次论坛不仅是技术交流的盛宴,更是行业发展趋势的风向标。论坛汇聚了众多企业领袖、专家学者及行业精英,共话云原生数据库技术发展,探讨行业最新…

无人机+大载重+长航时:油电混动多旋翼无人机技术详解

多旋翼无人机是一种具有三个及以上旋翼轴的特殊的无人驾驶旋翼飞行器。具有稳定性强、操控简单、勤务性高、价格便宜等优势,因此在市场上的应用非常广泛。此外,利用地面供电的绳系多旋翼通过电缆向多旋翼持续传输电能,可以大大提高多旋翼的空…