RabbitMQ消息队列快速入门

news2025/1/17 0:49:26

RabbitMQ消息队列快速入门

初始MQ

MQ全称为Message Queue,即消息队列,是在消息的传输过程中保存消息的容器。它是典型的生产者-消费者模型
生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。消息的生产和消费都是异步的,可以解耦发送者和接收者之间的通信,提高系统的可扩展性和可靠性

技术选型

目比较常见的MQ实现有:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka
RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
Messaging that just works — RabbitMQ

安装

基于Docker来安装RabbitMQ

docker pull rabbitmq

运行

docker run \
 -e RABBITMQ_DEFAULT_USER=daybreak \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq

在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

运行成功后,访问http://ip:15672,输入username和password即可进入管理控制台。

RabbitMQ架构

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

Spring AMQP

由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:Spring AMQP

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

交换机

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

声明队列和交换机

基于Bean方式声明
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {

    /**
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("daybreak.fanout");
    }

    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue fanoutQueue(){
        return new Queue("fanout.queue");
    }

    /**
     * 绑定队列和交换机
     * @param fanoutQueue3
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding FanoutBinding3(Queue fanoutQueue, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}
基于注解声明

声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue", durable = "true"),
        exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
))
public void listenDirectQueue(String msg){
    System.out.println("消费者收到了direct.queue的消息:" + msg);
}

声明Topic模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue"),
    exchange = @Exchange(name = "daybreak.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue(String msg){
    System.out.println("消费者接收到topic.queue的消息:【" + msg + "】");
}

快速入门

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

在application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.200.130 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: daybreak # 用户名
    password: 123456 # 密码

配置JSON转换器

Spring的消息发送代码接收的消息体是一个Object:

在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
然而默认情况下Spring采用的序列化方式是JDK序列化。

JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

使用JSON方式序列化需要引入以下依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

接收端

package com.itheima.consumer.listeners;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "daybreak.queue", durable = "true"),
            exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),
            key = "demo"
    ))
    public void listenDirectQueue(String msg){
        System.out.println("消费者收到了direct.queue的消息:" + msg);
    }
}

发送端

package com.itheima.publisher;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class MyPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void myTest(){
        String exchangeName = "daybreak.direct";
        String routingKey = "demo";
        String msg = "Hello,RabbitMQ";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

启动发送端和接收端后,运行结果如下:

消费者收到了direct.queue的消息:Hello,RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1235201.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多项式求和

题目描述 给定程序中 fun 函数的功能是&#xff1a;求出以下分数序列的前 n 项之和&#xff0c;并通过函数值返回 main 函数。 输入格式 输入参数。 输出格式 计算公式返回的结果。 输入输出样例 输入1 5 输出1 8.391667 python解&#xff1a; def fun(n):a1b2s0for…

PyTorch中并行训练的几种方式

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

Linux下使用宏定义判断系统架构和系统类型

文章目录 查看编译器当前支持的宏定义查找指定的宏不同架构不同系统 附录-编译器内部常用的一些宏定义宏定义实际应用使用宏定义判断系统架构使用宏定义判断系统类型 一般情况下在linux下做C/C方面的开发不需要太关注系统架构&#xff0c;当然如果涉及到不同架构下的适配问题&a…

『亚马逊云科技产品测评』活动征文|基于Lightsail 使用 html + css 实现圣诞树

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 前言 又快要到今年的圣诞节了&#xff0c;去年看好多小伙伴分享自己的圣…

完美解决:yum -y install nginx 报出 没有可用软件包 nginx。错误:无须任何处理

目录 一、问题&#xff1a; 二、原因&#xff1a; 三、解决方法&#xff1a; 一、问题&#xff1a; [rootlocalhost ~]# yum -y install nginx 已加载插件&#xff1a;fastestmirror Loading mirror speeds from cached hostfile * base: mirrors.bfsu.edu.cn * extras: m…

windows电脑连接Android和iPhone真机调试

windows电脑连接Android和iPhone真机调试 目前用的是Hbuilder X编辑器&#xff0c;在正常情况下&#xff0c;Android手机需要在 "设置 ----> 更多设置 ----->关于手机 ------> 版本号&#xff08;手指点击5-7下即可打开开发者模式&#xff09;"(我的是vivo的…

环境配置|GitHub——如何在github上搭建自己写的网站

下面简单地总结了从本地的网页文件到在github服务器上展示出来即可以通过网络端打开的过程&#xff1a; &#xff08;以下可能会出现一些难点&#xff0c;照着做就可以了&#xff0c;由于笔者是小白&#xff0c;也不清楚具体原理是什么&#xff0c;希望有一天成为大神的时候能轻…

【漏洞复现】IP-guard WebServer 存在远程命令执行漏洞

漏洞描述 IP-guard是由溢信科技股份有限公司开发的一款终端安全管理软件,旨在帮助企业保护终端设备安全、数据安全、管理网络使用和简化IT系统管理。 免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩序,尊重社会公德,不得利用网络从事危…

MySQL InnoDB 引擎底层解析(三)

6.3.3. InnoDB 的内存结构总结 InnoDB 的内存结构和磁盘存储结构图总结如下&#xff1a; 其中的 Insert/Change Buffer 主要是用于对二级索引的写入优化&#xff0c;Undo 空间则是 undo 日志一般放在系统表空间&#xff0c;但是通过参数配置后&#xff0c;也可以用独立表空 间…

linux 系统调用流程分析

x86 1.系统调用 系统调用是用户空间程序与内核交互的主要机制。系统调用与普通函数调用不同&#xff0c;因为它调用的是内核里的代码。使用系统调用时&#xff0c;需要特殊指令以使处理器权限转换到内核态。另外&#xff0c;被调用的内核代码由系统调用号来标识&#xff0c;而…

从android.graphics.Path中取出Point点,Kotlin

从android.graphics.Path中取出Point点&#xff0c;Kotlin /*** 从一条Path中获取多少个Point点*/private fun getPoints(path: Path, pointCount: Int): Array<FloatPoint?> {val points arrayOfNulls<FloatPoint>(pointCount)val pm PathMeasure(path, false)…

[Linux] 进程入门

&#x1f4bb;文章目录 &#x1f4c4;前言计算机的结构体系与概念冯诺依曼体系结构操作系统概念目的与定位 进程概念描述进程-PCBtask_struct检查进程利用fork创建子进程 进程状态进程状态查看僵尸进程孤儿进程 &#x1f4d3;总结 &#x1f4c4;前言 作为一名程序员&#xff0c…

HarmonyOS云开发基础认证【最新题库 满分答案】

系列文章 HarmonyOS应用开发者基础认证【闯关习题 满分答案】 HarmonyOS应用开发者基础认证【满分答案】 HarmonyOS云开发基础认证【最新题库 满分答案】 目录 系列文章一、判断题二、单选题三、多选题 一、判断题 1.应用架构的演进依次经历了微服务架构、单体架构、Serverle…

Python (十二) 文件

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一份大厂面试资料《史上最全大厂面试题》&#xff0c;Springboot、微服务、算法、数据结构、Zookeeper、Mybatis、Dubbo、linux、Kafka、Elasticsearch、数据库等等 …

【深度优先搜索遍历算法的实现,广度优先遍历(BFS-Breadth_First Search),构造最小生成树】

文章目录 深度优先搜索遍历算法的实现邻接矩阵表示的无向图深度遍历实现&#xff1a;DFS算法分析 广度优先遍历&#xff08;BFS-Breadth_First Search&#xff09;构造最小生成树 深度优先搜索遍历算法的实现 邻接矩阵表示的无向图深度遍历实现&#xff1a; 实现深度优先遍历的…

MATLAB | 绘图复刻(十三) | 带NaN图例的地图绘制

有粉丝问我地图绘制如何添加NaN&#xff0c;大概像这样&#xff1a; 或者这样&#xff1a; 直接上干货&#xff1a; 原始绘图 假设我们有这样的一张图地图&#xff0c;注意运行本文代码需要去matlab官网下载Mapping Toolbox工具箱&#xff0c;但是其实原理都是相似的&…

俄罗斯网络间谍组织在有针对性的攻击中部署LitterDrifter USB蠕虫

导语 俄罗斯网络间谍组织最近在针对乌克兰实体的攻击中&#xff0c;部署了一种名为LitterDrifter的USB蠕虫。这种蠕虫具有自动传播恶意软件的功能&#xff0c;并与威胁行为者的命令和控制服务器进行通信。该组织被称为Gamaredon&#xff0c;其攻击行动被认为是大规模的&#xf…

level=warning msg=“failed to retrieve runc version: signal: segmentation fault“

安装docker启动后&#xff0c;发现里面没有runc版本信息 目前看是少了runc组件 那我们安装runc https://github.com/opencontainers/runc/releases/download/v1.1.10/runc.amd64 [rootlocalhost ~]# mv runc.amd64 /usr/bin/runc mv&#xff1a;是否覆盖"/usr/bin/runc&q…

基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度matlab程序

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 参考文献&#xff1a; 基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度——陈登勇 主要内容&#xff1a; 以碳交易和碳封存成本、燃煤机组启停和煤耗成本、弃风成本、购气成本之和为目标函数&…

初识树(c语言)

树 定义&#xff1a;树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。 有一个特殊的结点&#xff0c;称为根结点&#xff0c;根节点没有前驱结点 除根节点外&#xff0c;其余结点被分成M(M>0)个互不相交…