Java真的不难(五十四)RabbitMQ的入门及使用

news2025/2/27 2:07:13

RabbitMQ的入门及使用

一、什么是RabbitMQ?

MQ全称为Message Queue,即消息队列。消息队列是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦


二、RabbitMQ与Kafka的全面对比

对比项KafkaRabbitMQ
开发语言scala、Javaerlang
是否支持多租户2.x.x支持支持
是否支持topic优先级不支持支持
是否支持消息全局有序不支持支持
是否支持消息分区有序支持支持
是否有内置监控
是否支持多个生产者支持支持
是否支持多个消费者支持支持
是否支持一个分区多个消费者不支持不支持
是否支持JMX支持不支持
是否支持加密支持支持
消息队列协议支持仅支持自定义协议支持AMQP、MQTT、STOMP协议
客户端语言支持支持支持
是否支持消息追踪不支持支持
是否支持消费者推模式不支持支持
否支持消费者拉模式支持支持
是否支持广播消息支持支持
是否支持消息回溯支持消息回溯,因为消息持久化,消息被消费后会记录offset和timstamp不支持,消息确认被消费后,会被删除
是否支持消息数据持久化支持支持
是否支持流量控制支持支持
是否支持事务性消息支持不支持
元数据管理通过zookeeper进行管理支持
默认服务端口90925672
默认监控端口kafka web console 9000;kafka manager 9000;15672
相对网络开销较小较大
相对内存消耗较小较大
相对cpu消耗较大较小

实际场景选择:

Kafka :
常作为消息传输的数据管道 ,优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,严谨性上不如 RabbitMQ,但 kafka保证每条消息最少送达一次,有较小的概率会出现数据重复发送的情况 ,若消息吞吐量极大则Kafka

RabbitMQ:
RabbitMQ金融场景中经常使用 ,常作为交易数据作为数据传输管道, 具有较高的严谨性,数据丢失的可能性更小,具备更高的实时性,和Spring是统一厂商开发,后期支持比较好,目前最流行的,对容错性的处理比较完善

RabbitMQ 支持发布订阅、轮询分发、公平分发、重发、消息拉取
Kafka 不支持重发、事务


三、Linux上安装RabbitMQ

这次安装将RabbitMQ部署在Linux上,可以在电脑本地安装一台Linux,也可以购买云服务器,若购买云服务器,则需要在安全组内把后面要用到的端口给打开!

1、使用Docker安装(最简单的方法)

拉取RabbitMQ:

以下均为Linux指令

#拉取RabbitMQ:
docker pull RabbitMQ

#启动rabbitmq
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

#查看docker目前在运行的容器,是否有rabbitmq
docker ps

2、图形化安装插件

#进入运行中的容器
docker exec -it 镜像ID /bin/bash

#rabbitmq图形化安装插件
rabbitmq-plugins enable rabbitmq_management

3、WEB页面开启资源监控

#进入容器
docker exec -it rabbitmq /bin/bash

#切到对应目录
cd /etc/rabbitmq/conf.d/

#修改 management_agent.disable_metrics_collector = false
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf

#退出容器
exit

#重启容器
docker restart rabbitmq

然后在浏览器内输入服务器地址+端口(15672)即可进入WEB管理页面,默认账号密码均为:guest

在这里插入图片描述


四、Docker配置RabbitMQ集群

先停止在运行的MQ

docker stop 运行容器ID
#启动三个容器
docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 -p 15672:15672 -p 5672:5672 -p 1883:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq

docker run -d --hostname rabbitmq02 --name rabbitmqCluster02 -p 15673:15672 -p 5673:5672 -p 1884:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 rabbitmq

docker run -d --hostname rabbitmq03 --name rabbitmqCluster03 -p 15674:15672 -p 5674:5672 -p 1885:1883 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 --link rabbitmqCluster02:rabbitmq02 rabbitmq

#Erlang Cookie 值必须相同,也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同。因为 RabbitMQ 是用Erlang实现的,Erlang Cookie 相当于不同节点之间通讯的密钥,Erlang节点通过交换 Erlang Cookie 获得认证


#进入第二个容器
docker exec -it rabbitmqCluster02 bash
rabbitmqctl stop_app
rabbitmqctl reset

#加入集群
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit


#进入第三个容器
docker exec -it rabbitmqCluster03 bash
rabbitmqctl stop_app
rabbitmqctl reset

#加入集群
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit

#主要参数
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
-p 1883:1883 mqtt  访问端口

然后依次运行2、3的步骤安装插件和开启资源监控
上诉15672、15673、15674端口均需要在服务器安全组内开启

完成后即可的WEB页面看到MQ的集群


五、SpringBoot整合使用RabbitMQ

依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml文件的配置:

spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    host: 通道运行的地址
    port: 5672

生产者配置类Config:

模拟美团外卖下单,正常下单后若马上被接单则被这条信息直接消费,若5秒内没人接单,该订单消息将进入加急派单队列(死信队列)

所以我们需要两台交换机和两个通道

@Configuration
public class mtRabbitConfig {

	//正常通道的交换机
    @Bean
    public FanoutExchange mtExchange(){
        return new FanoutExchange("mt_fanout_exchange",true,false);
    }

	//死信通道的交换机
    @Bean
    public FanoutExchange mtDeadExchange(){
        return new FanoutExchange("mt_fanout_dead_exchange",true,false);
    }

	//正常通道,给消息设计过期时间,超过该时间未被消费,则进入指定的mt_fanout_dead_exchange
    @Bean
    public Queue mtQueue(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","mt_fanout_dead_exchange");
        return new Queue("mt_queue",true,false,false,args);
    }

    @Bean
    public Queue mtDaedQueue(){
        return new Queue("mt_dead_queue",true,false,false);
    }

	//交换机与通道绑定
    @Bean
    public Binding mtBinding(){
        return BindingBuilder.bind(mtQueue()).to(mtExchange());
    }

    @Bean
    public Binding mtDeadBinding(){
        return BindingBuilder.bind(mtDaedQueue()).to(mtDeadExchange());
    }
}

生产者(模拟用户下单):

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟美团订单下单,若5秒不接单(消费),则进入死信队列(加急派单)
    public void mtTakeOutOrder(String name, String food, String number) {
        UUID takeOutId = UUID.randomUUID();
        String orderTime = DateFormat.getDateTimeInstance().format(new Date());
        String exchangeName = "mt_fanout_exchange";
        String takeOutMes = "美团订单编号:" + takeOutId + " " + orderTime + " " + name + " " + food + " " + number;
        String routingKey = "";
        
		//将消息放入通道内
        Object result = rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, takeOutMes);
        System.out.println("配送中心响应:"+result);
    }

测试类:

    @Test
    void mtTakeOutOrder() throws InterruptedException {
         takeOutOrder.mtTakeOutOrder("小张"+i, "麻辣烫", "10086");
 }

消费者(外卖接单中心):

正常接单消费者:

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "mt_queue", autoDelete = "false"),
        exchange = @Exchange(value = "mt_fanout_exchange", type = ExchangeTypes.FANOUT)))
@Component
public class mtTakeOutDelivery {

    @RabbitHandler
    public String buyTrainTickets(String message) {
        System.out.println("正常美团外卖订单已接单:" + message);
        return "配送中心已接单";
    }
}

加急接单消费者(死信队列内的消息):

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "mt_dead_queue", autoDelete = "false"),
        exchange = @Exchange(value = "mt_fanout_dead_exchange", type = ExchangeTypes.FANOUT)))
@Component
public class mtDeadTakeOutDelivery {

    @RabbitHandler
    public String buyTrainTickets(String message) {
        System.out.println("加急饿了么外卖订单已接单:" + message);
        return "配送中心已接单";
    }
}

消费者运行后,只要监听的两个通道内有消息,就会被消费


六、RabbitMQ的手动ACK

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。

自动应答就是上面的案例,只要被消费者取出,通道内就会删除这个消息,万一这个消息在消费者那边处理异常,因为通道里已经没用这条消息了,就会出现消息丢失。所以在有些场景需要改为手动应答ACK,就是消费者把这条消息确认处理完毕后,再告诉通道删除消息,若异常,这条消息将返回通道内可以重新处理,这就是手动应答。

还是一样,来配置两个通道:

生产者配置:

@Configuration
public class TestQueueConfig {


    @Bean
    public FanoutExchange TestExchange() {
        return new FanoutExchange("test_exchange", true, false);
    }

    @Bean
    public FanoutExchange TestDeadExchange() {
        return new FanoutExchange("test_dead_exchange", true, false);
    }

    @Bean
    public Queue TestDeadQueue() {
        return new Queue("test_dead_queue", true, false, false);
    }

    @Bean
    public Queue TestQueue() {
        Map<String, Object> args = new HashMap<>();
        //20秒钟未消费转到死信队列
        args.put("x-message-ttl", 5000);
        args.put("x-dead-letter-exchange", "test_dead_exchange");
        return new Queue("test_queue", true, false, false, args);
    }

    @Bean
    public Binding TestBinding() {
        return BindingBuilder.bind(TestQueue()).to(TestExchange());
    }

    @Bean
    public Binding TestDeadBinding() {
        return BindingBuilder.bind(TestDeadQueue()).to(TestDeadExchange());
    }
}

生产者业务代码不变,与上个案例一致也行

消费者配置类:

@Configuration
public class MyselfReceiverConfig {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Autowired
    private MyselfReceiver myselfReceiver;

    @Autowired
    private MyselfDeadReceiver myselfDeadReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(10);
        //手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames("test_queue");
        container.setMessageListener(myselfReceiver);
        return container;
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer_Dead(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(10);
        //手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames("test_dead_queue");
        container.setMessageListener(myselfDeadReceiver);
        return container;
    }
}

正常消费者:

@Component
public class MyselfReceiver implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            byte[] bytes = message.getBody();
            String mes = new String(bytes);
            String substring = mes.replace("\\\"","'");
            System.out.println("正常通道内消息:"+substring);
            //业务主体

            //若业务处理无异常,则回复通道删除消息
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            channel.basicReject(deliveryTag,true);
        }
    }
}

死信通道消费者:

@Component
public class MyselfDeadReceiver implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            byte[] bytes = message.getBody();
            String mes = new String(bytes);
            String result = mes.replace("\\\"", "'");
            System.out.println("死信通道内消息:" + result);
            //业务主体

            //若业务处理无异常,则回复通道删除消息
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //有异常把消息返回通道
            channel.basicReject(deliveryTag, true);
        }
    }
}

这样就完成了手动ACK,若消费者处理没有异常,将使用channel.basicAck(deliveryTag, true);
若出现了异常,将使用channel.basicReject(deliveryTag, true);,此消息将重新进入通道内,这也确保了未消费成功的消息不会出现丢失的情况。


在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/347428.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis+token实现登录校验,前后端分离,及解跨域问题的4种方法

目录 一、使用自定义filter实现跨域 1、客户端向服务端发送请求 2、服务端做登录验证了&#xff0c;并生成登路用户对应的token&#xff0c;保存到redis 3、响应&#xff08;报错&#xff09;-----跨域问题 4、解决跨域问题--------服务器端添加过滤器&#xff0c;设置请求…

Mybatis流式游标查询-大数据DB查询OOM查询问题

问题场景Mysql数据处理类型分以下三种com.mysql.cj.protocol.a.result.ResultsetRowsStatic&#xff1a;普通查询&#xff0c;将结果集一次性全部拉取到内存com.mysql.cj.protocol.a.result.ResultsetRowsCursor&#xff1a;游标查询&#xff0c;将结果集分批拉取到内存&#x…

Pytorch入门教程

Pytorch入门教程 Pytorch简介 概念&#xff1a;由Facebook人工智能研究小组开发的一种基于Lua编写的Torch库的Python实现的深度学习库。优势&#xff1a;简洁、上手快、具有良好的文档和社区支持、项目开源、支持代码调试、丰富的扩展库 Pytorch基础知识 1.张量Tensor 分类…

【Ubuntu新手入门2】深度学习环境配置 Anaconda+Pycharm+PyTorch

【Ubuntu新手入门2】深度学习环境配置 AnacondaPycharmpytorch前言安装PyTorch查看cuda版本mobaxterm软件远程连接linux服务器安装安装anaconda安装pycharm安装新环境pytorch前言 本系统&#xff1a;Ubuntu18.04&#xff0c;anaconda最新&#xff0c;Pycharm最新&#xff0c;P…

泛微采知连,为组织提供安全、合规、智能的数字化文控系统

作为市场主体&#xff0c;企业需要建立健全的质量管理体系&#xff0c;并且及时更新&#xff0c;以应对激烈的市场竞争&#xff0c;实现企业可持续发展。 质量体系在很大程度上通过文件化的形式表现出来。《质量管理体系要求》(GB/T19001—2016/ISO9001&#xff1a;2015)标准指…

ESP-IDF:TCP多线程并发服务器

核心代码&#xff1a; 核心思想就是主线程只处理socket监听功能&#xff0c;把数据处理部分分配到不同的线程中去处理。来了一个客户端连接&#xff0c;就分配新的线程去处理该客户端的数据请求。 代码&#xff1a; /多线程并发服务器/ #include <stdio.h> #include …

实用调试技巧【上篇】

&#x1f534;本文章是在 Visual Studio 2022&#xff08;VS2022&#xff09;编译环境下进行操作讲解 文章目录&#x1f973;1. 什么是bug&#xff1f;&#x1f973;2.调试有多重要&#xff1f;2.1. 我们是如何写代码的&#xff1f;2.2.调试是什么&#xff1f;2.3.调试的基本步…

uni-app 消息推送功能UniPush

uni-app 消息推送功能UniPush,这里用的是uni-app自带的UniPush1.0&#xff08;个推服务&#xff09;&#xff0c;所以只针对UniPush1.0介绍实现步骤。 建议查阅的文章&#xff1a; UniPush 1.0 使用指南[2] Unipush 常见问题[3] 当然现在已经出了UniPush2.0&#xff08;HBuilde…

如何编写一个 npm 插件?

提到写 npm 插件&#xff0c;很多没搞过的可能第一感觉觉得很难&#xff0c;无从下手&#xff0c;其实不然。 我们甚至写个简单的 console.log(hello word)&#xff0c;都是可以当成一个插件发布上去的。 其实无从下手的主要难点还是在于你的具体要做的功能逻辑&#xff0c;这…

FPGA纯verilog代码实现sobel 边缘检测,提供2套工程源码和技术支持

目录1、前言2、理论基础3、设计思路和架构4、图像输入5、RGB转灰度6、3x3卷积滑窗获取7、Sobel卷积运算8、FDMA图像缓存9、图像输出10、工程1详解&#xff1a;ov5640输入11、工程2详解&#xff1a;hdmi输入12、上板调试验证并演示13、福利&#xff1a;工程代码的获取1、前言 边…

vue 在线编辑、实时预览的代码交互组件 vue-code-view

文章目录前言实现安装依赖vue.config.js配置main.js 全局注册参数配置新建vue单文件组件库混合使用错误处理前言 vue-code-view是一个基于 vue 2.x、轻量级的代码交互组件&#xff0c;在网页中实时编辑运行代码、预览效果的代码交互组件。 官方手册&#xff1a; Vue Code Vie…

LeetCode 25. K 个一组翻转链表

原题链接 难度&#xff1a;hard\color{red}{hard}hard 题目描述 给你链表的头节点 headheadhead &#xff0c;每 kkk 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 kkk 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 kkk 的整数倍&#…

vue2配置cesium详细教程

1.简介 网络上现在关于vue配置cesium的教程有很多&#xff0c;包括csdn和掘金等。虽然这些教程在一定意义上提供了开发者如何配置cesium的方法&#xff0c;但是大部分的方法都不切实际&#xff0c;因为每个人的电脑中npm、node、cesium、vue、webpack的版本都基本不一致的&…

汽车直营模式下OTD全流程

概述 随着新能源汽车的蓬勃发展&#xff0c;造车新势力的涌入&#xff0c;许多新能源车企想通过直营的营销模式来解决新能源汽车市场推广速度缓慢问题&#xff0c;而直营模式下OTD&#xff08;Order-To-Delivery&#xff0c;订单-交付&#xff09;全流程的改革创新在这过程中无…

高压放大器在非线性超声传播研究中的应用

实验名称&#xff1a;高压放大器在非线性超声传播研究中的应用研究方向&#xff1a;超声波测试目的&#xff1a;超声波在混凝土中的传播是一个极为复杂的非线性过程。当超声波穿过混凝土材料时&#xff0c;携带了大量有关混凝土内部结构和构造的信息。传统的超声波检测方法虽然…

Android13通知运行时权限

部分应用更新到Android13以上之后&#xff0c;没有横幅(在屏幕上弹出)通知了。 Android 13&#xff08;API 级别 33&#xff09;及更高版本支持用于从应用发送非豁免&#xff08;包括前台服务 [FGS]&#xff09;通知的运行时权限&#xff1a;POST_NOTIFICATIONS。此更改有助于…

死锁(5.1)

死锁 1 死锁的基本概念 1.1 死锁的定义 死锁是发生在一组相互合作或竞争的线程或进程中的一个问题。因此可以定义为&#xff1a;一组竞争系统资源或相互通信的进程相互的“永久”阻塞。若无外力作用&#xff0c;这组进程将永远不能继续执行。 1.2死锁产生的原因进程 &…

第四章 统计机器学习

机器学习&#xff1a;从数据中学习知识&#xff1b; 原始数据中提取特征&#xff1b;学习映射函数f&#xff1b;通过映射函数f将原始数据映射到语义空间&#xff0c;即寻找数据和任务目标之间的关系&#xff1b; 机器学习&#xff1a; 监督学习&#xff1a;数据有标签&#x…

基于Java实现的商品出入库管理系统

基于Java实现的商品出入库管理系统&#xff08;文末附源码&#xff09; 前言 一、出入库管理系统含义介绍&#xff1a; 出入库管理系统是一套利用一物一码技术对仓库货物各环节实施全过程控制管理的系统&#xff0c;可对仓库货物进行入库、出库、货位、批次、保质期等实现一…

DDL 数据定义语言

DDL 数据定义语言 目录概述一、库的管理1、库的创建2、库的修改【一般不修改&#xff0c;容易出现错误】3、库的删除二、表的管理【重要】1、表的创建2、表的修改3、表的删除4、表的复制 【可以跨库复制】练习题概述 数据定义语言 库和表的管理 一、库的管理 创建、修改、删除…