RabbitMQ 消息队列安装及入门

news2025/1/24 17:57:38

市面常见消息队列中间件对比

技术名称吞吐量 /IO/并发时效性(类似延迟)消息到达时间可用性可靠性优势应用场景
activemq万级简单易学中小型企业、项目
rabbitmq万级极高(微秒)极高生态好(基本什么语言都支持)、时效性高、易学适合绝大数的分布式应用
kafka10万 QBS高(毫秒)极高极高吞吐量大、可靠性、可用性、强大的数据流处理能力适合大规模处理数据的场景、比如构建日志手机系统、实时数据传输、事件流收集传输
rocketmq10万 QBS高ms极高极高吞吐量大、可靠性、可用性、可扩展性适用于金融等可靠性要求较高的场景、适合大规模的消息处理。金融、电商、大规模社交
pulsar10万 QBS高ms极高可靠性、可用性很高、新兴(技术架构先进)适合大规模、高并发的分布式系统(云原生)适合实时分析、事件流处理、物联网数据处理。

RabbitMQ 

RabbitMQ 是基于 AMQP 高级消息队列协议的。

 实际使用可根据官方文档的 demo 。

官方文档:RabbitMQ Tutorials | RabbitMQ

模型

生产者:通俗就是发消息的人,比如在外卖软件上点餐的人

消费者:通俗就是处理消息的任务,比如外卖软件上的商家,需要根据顾客的要求制作餐

交换机:负责把消息转发到对应的队列中,比如有外卖的时候,系统给附近的外面小哥派单

队列:存放消息的地方,等待消费者消费,比如商家肯定不是只做一份餐,做好的餐放在一个指定的位置等待外卖小哥来取餐

路由:转发,就是怎么把消息从一个地方转到另一个地方,通常加在交换机和队列之间,比如系统指定某个范围的外卖小哥接这单

安装

1. 首先安装 RabbitMQ,直接官网下载即可,如果下载速度慢,可以换个网络,或者找找有没有国内镜像。(当初我下载的时候找了半天的镜像都是版本比较老的,结果想着挂一晚上下载,结果官网突然就快了,白折腾了。)

官方网站:Installing on Windows | RabbitMQ

国内镜像:Index of rabbitmq-server-local/v3.12.7

一路 next ,傻瓜式安装即可

安装之后检查服务中是否已经运行了。

2. 安装监控面板

在 RabbitMQ 安装目录下的 sbin 目录下的CMD 输入下面的命令

rabbitmq-plugins.bat enable rabbitmq_management

 安装成功:

默认端口号是 5672,webUI 是 15672

在浏览器输入地址打开管理界面:http://localhost:15672

默认账号密码是 guest

注意:1. 安装目录不能是中文,不能有空格等非法字符,否则页面打不开

           2. 如果想要在远程服务器访问 RabbitMQ 管理面板,需要创建管理员账号,比如在宝塔面板使用时宝塔面板提供的 admin账号,地址就是宝塔面板的 IP 

创建账号:access-control | RabbitMQ

入门

依赖引入

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

单消费者和生产者

一对一的关系

1. 生产者代码

public class SingleProducer {

  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
    //创建连接
    ConnectionFactory factory = new ConnectionFactory();
    //设置了本地连接,如果修改了用户名和密码,需要设置
    /*factory.setPassword();
    factory.setUsername();*/
    factory.setHost("localhost");
    //建立连接、创建频道
    //频道,类似客户端,用于调用server
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //创建队列,队列持久化,第二份参数设置为 true
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    //发送消息
    channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

channel 频道:理解为操作消息队列的 Client,通过 channel 收发消息,提供了和消息对了 server 建立通信的传输方法

channel.queueDeclare 方法参数:

queue:这是一个字符串参数,代表要声明的队列的名称。如果队列不存在,则会自动创建一个新的队列。

durable:这是一个布尔值参数,表示队列是否持久化。如果设置为true,则队列会在服务器重启后仍然存在;如果设置为false,则队列在服务器重启后会被删除。默认值为false。

exclusive:这也是一个布尔值参数,表示队列是否为独占模式。如果设置为true,则只有当前连接可以访问该队列;如果设置为false,则其他连接也可以访问该队列。默认值为false。

autoDelete:这是另一个布尔值参数,表示队列是否自动删除。如果设置为true,则当最后一个消费者取消订阅时,队列将被删除;如果设置为false,则队列将一直存在,直到手动删除或服务器重启。默认值为false。

arguments:这是一个可选参数,用于设置队列的其他属性,比如消息的最大长度、最大优先级等。

channel.basicPublish 参数:

exchange:这是一个字符串参数,代表交换机的名称。如果不需要使用特定的交换机,可以传递一个空字符串("")。交换机是RabbitMQ中用于接收生产者发送的消息并根据绑定规则路由到队列的组件。

routingKey:这也是一个字符串参数,它指定了发布消息的队列。无论通道绑定到哪个队列,最终发布的消息都会包含这个指定的路由键。路由键是用来确定消息应该发送到哪个队列的重要信息。

message:这是要发布的消息本身,通常是字节数组的形式。

properties:这是一个可选参数,用于设置消息的属性,比如消息的优先级、过期时间等。

在使用channel.basicPublish时,需要注意以下几点:

exchange和routingKey不能为空:在AMQImpl类中的实现要求这两个参数都不能为null,否则会抛出异常。

交换机类型:根据不同的需求,可以选择不同类型的交换机,如fanout、direct或topic。每种类型的交换机都有其特定的路由规则。

非命名队列:在某些情况下,比如日志系统,可以使用非命名队列,这样消费者可以接收到所有相关的日志消息,而不是特定的部分。

2. 消费者代码

public class SingleConsumer {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //声明队列,同一个消息队列参数必须一致
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //定义了如何处理消息
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
      System.out.println(" [x] Received '" + message + "'");
    };
    //接收、消费消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

 channel.basicConsume 参数:

  1. queue:这是一个字符串参数,代表要消费的队列的名称。如果队列不存在,则会抛出异常。
  2. onMessage:这是一个回调函数,当有新的消息到达时会被调用。该函数需要接收两个参数:一个表示消息内容的Delivery对象和一个表示通道的Channel对象。
  3. consumerTag:这是一个可选参数,用于标识消费者。如果没有指定,则会自动生成一个唯一的标识符。
  4. autoAck:这是一个布尔值参数,表示是否自动确认消息。如果设置为true,则在消息被处理后会自动发送确认信息;如果设置为false,则需要手动发送确认信息。默认值为false。
  5. arguments:这是一个可选参数,用于设置消费者的其他属性,比如消息的最大长度、最大优先级等。

在使用channel.basicConsume时,需要注意以下几点:

  1. 队列名称:队列名称应该是唯一的,否则会抛出异常。
  2. 消息处理:在onMessage回调函数中,需要对消息进行处理,并根据需要发送确认信息。
  3. 消费者标识符:可以通过设置consumerTag来标识消费者,以便在后续操作中进行识别和管理。
  4. 消费者属性:可以通过设置消费者的其他属性来控制消费者的行为,比如设置消息的最大长度、最大优先级等。

多消费者

多个消费者,比如一个工厂生产商品,一个商店卖不完,分给多个商店一起卖

生产者代码和上面一样


public class MultiProducer {

    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        //创建连接
        ConnectionFactory factory = new ConnectionFactory();
        //设置本地连接
        factory.setHost("localhost");
        //创建队列,创建频道,类似客户端
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //队列持久化
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            //设置消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                //输入消息
                String message = scanner.nextLine();
                //发送消息
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }

        }
    }
}

控制处理任务的积压数,最多同时处理任务数

channel.basicQos(1); //最多处理1个

消息确认机制

ack 确认、nack 消息失败、reject 拒绝

当消息拿走之后会有一个确认机制,保证消息成功被消费。当消费者接收消息会给一个反馈,确认消息的状态,成功消息才会被移除。

支持配置 autoack ,建议修改为 false,根据实际情况手动确认。

//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//手动拒绝
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

 消费者代码

public class MultiConsumer {

    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        //创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        for (int i = 0; i < 2; i++) {
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            //队列持久化,参数要一致
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
			//控制处理任务的积压数,最多同时处理任务数
            channel.basicQos(1);
            //定义了如何处理消息
            int finalI = i;
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                try {
                    //处理工作的逻辑
                    System.out.println(" [x] Received '" +"消费者:" + finalI + " 消息:"+ message + "'");
                    //睡一定时间,模拟机器处理能力有限
                    Thread.sleep(20000);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            //接收消息,消费消息,开启消息监听
            channel.basicConsume(TASK_QUEUE_NAME, false , deliverCallback, consumerTag -> {
            });
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1688605.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

告别付费!这款开源软件让你免费看高清电视直播!

文章目录 📖 介绍 📖🏡 演示环境 🏡📝 开源详情 📝🎯 软件介绍🚀 软件特点🎈 获取方式 🎈⚓️ 相关链接 ⚓️📖 介绍 📖 🔮 揭秘一款神奇的软件,让你轻松畅游电视直播的海洋,无需付费,无需繁琐设置,即可畅享海量高清节目!想要知道它是什么吗?跟…

如何改变echo在Linux下的输出颜色

文章目录 问题回答常规输出字体加粗斜体字带下划线闪烁效果 参考 问题 我正在尝试使用 echo 命令在终端中打印文本。 我想把文本打印成红色。我该怎么做&#xff1f; 回答 你可以使用 ANSI escape codes 定义控制输出颜色的变量。 ANSI escape codes是一种用于在文本中设置…

Gitee在已有项目基础上创建仓库中遇到的问题和解决

问题一&#xff1a;fatal: remote origin already exists 解释&#xff1a;当前仓库添加了一个名为"origin"的远程仓库配置&#xff0c;此时输入 git remote add origin https://xxx就会提示上面的内容。 解决方案1:移除旧的origin git remote remove origin 解决方案…

pyqt6入门案例

效果预览 hello.ui <?xml version"1.0" encoding"UTF-8"?> <ui version"4.0"><class>Dialog</class><widget class"QDialog" name"Dialog"><property name"geometry"><…

2024年5月份最新独角数卡使用USDT详细小白教程

直观配套视频教程 2024年5月份最新独角数卡安装及USDT使用详细小白教程 1、创建服务器 Centos或者Ubuntu2、宝塔面板开心版安装寶塔 Linux 面版 8.0.5 開心版 - 2024年1月12日 - 开心专区 - 异次元 - Powered by Discuz!Centos安装命令&#xff08;默认安装是 8.0.1 直接在线升…

PyMySQL:连接Python与MySQL的桥梁

系列文章目录 更新ing... MySQL操作全攻略&#xff1a;库、表、数据、事务全面指南深入探索MySQL SELECT查询&#xff1a;从基础到高级&#xff0c;解锁数据宝藏的密钥MySQL SELECT查询实战&#xff1a;练习题精选&#xff0c;提升你的数据库查询技能PyMySQL&#xff1a;连接P…

代码随想录算法训练营第三十四天 | 理论基础、455.分发饼干、376、摆动序列、53.最大子序和

目录 理论基础 455.分发饼干 思路 代码 376.摆动序列 思路 代码 53.最大子序和 思路 代码 理论基础 代码随想录 455.分发饼干 代码随想录 思路 可以是大饼干优先满足大胃口&#xff0c;也可以是小饼干优先满足小胃口。 代码 class Solution:def findContentChildre…

【深度学习】与【PyTorch实战】

目录 一、深度学习基础 1.1 神经网络简介 1.2 激活函数 1.3 损失函数 1.4 优化算法 二、PyTorch基础 2.1 PyTorch简介 2.2 张量操作 2.3 构建神经网络 2.4训练模型 2.5 模型评估 三、PyTorch实战 3.1 数据加载与预处理 3.2 模型定义与训练 3.3 模型评估与调优 3…

与WAF的“相爱相杀”的RASP

用什么来保护Web应用的安全&#xff1f; 猜想大部分安全从业者都会回答&#xff1a;“WAF&#xff08;Web Application Firewall,应用程序防火墙&#xff09;。”不过RASP&#xff08;Runtime Application Self-Protection&#xff0c;应用运行时自我保护&#xff09;横空出世…

java操作Redis缓存设置过期时间

如何用java操作Redis缓存设置过期时间&#xff1f;很多新手对此不是很清楚&#xff0c;为了帮助大家解决这个难题&#xff0c;下面小编将为大家详细讲解&#xff0c;有这方面需求的人可以来学习下&#xff0c;希望你能有所收获。 在应用中我们会需要使用redis设置过期时间&…

android studio接入facebook踩坑1

今天在接入facebook第三方登录的时候&#xff0c;点击登录按钮&#xff0c;APP闪退&#xff0c;并报错 java.lang.RuntimeException Failure delivering result ResultInfo{whonull,request64206,result-1} 新文章链接https://lengmo714.top/facebook1.html 如下图&#xff1a;…

网络传输层

叠甲&#xff1a;以下文章主要是依靠我的实际编码学习中总结出来的经验之谈&#xff0c;求逻辑自洽&#xff0c;不能百分百保证正确&#xff0c;有错误、未定义、不合适的内容请尽情指出&#xff01; 文章目录 1.端口号的基础2.传输层两协议2.1.UDP 协议2.1.1.协议结构2.1.2.封…

【Redis7】Redis持久化机制之RDB

文章目录 1.RDB简介2.RDB配置触发设置3.RDB的优缺点4.如何检查修复RDB文件5.如何禁用RDB6.RDB参数优化7.总结 1.RDB简介 Redis持久化机制中的RDB&#xff08;Redis Database&#xff09;是一种将Redis在某个时间点的数据以快照形式保存到磁盘上的方法。 原理&#xff1a;RDB通…

期货交易的雷区

一、做自己看不懂的行情做交易计划一样要做有把握的&#xff0c;倘若你在盘中找机会交易&#xff0c;做自己看不懂的行情&#xff0c;即便你做进去了&#xff0c;建仓时也不会那么肯定&#xff0c;自然而然持仓也不自信&#xff0c;有点盈利就想平仓&#xff0c;亏损又想扛单。…

Go 和 Delphi 定义可变参数函数的对比

使用可变参数函数具有灵活性、重用性、简化调用等优点&#xff0c;各个语言有各自定义可变参数函数的方法&#xff0c;也有通用的处理方法&#xff0c;比如使用数组、定义参数结构体、使用泛型等。 这里总结记录一下 go、delphi 的常用的定义可变参数函数的方式&#xff01; 一…

句柄降权绕过CallBacks检查

看到前辈们相关的文章&#xff0c;不太明白什么是句柄降权&#xff0c;于是专门去学习一下&#xff0c;过程有一点波折。 句柄降权 什么是句柄 当一个进程利用名称来创建或打开一个对象时&#xff0c;将获得一个句柄&#xff0c;该句柄指向所创建或打开的对象。以后&#xf…

前端自动将 HTTP 请求升级为 HTTPS 请求

前端将HTTP请求升级为HTTPS请求有两种方式&#xff1a; 一、index.html 中插入meta 直接在首页 index.html 的 head 中加入一条 meta 即可&#xff0c;如下所示&#xff1a; <meta http-equiv"Content-Security-Policy" content"upgrade-insecure-requests&…

metaMIC:无参考错误组装识别和校正宏基因组组装

#环境很重要&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; conda create -n metaMIC conda activate metaMIC mamba install python3.8 mamba install -c …

redis核心面试题二(实战优化)

文章目录 10. redis配置mysql实战优化[重要]11. redis之缓存击穿、缓存穿透、缓存雪崩12. redis实现分布式session 10. redis配置mysql实战优化[重要] // 最初实现OverrideTransactionalpublic Product createProduct(Product product) {productRepo.saveAndFlush(product);je…

变量命名的艺术:从蛇形到驼峰

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、蛇形命名法的魅力 二、类名和模块名的特殊规则 三、驼峰命名法的魅力与挑战 四、保持…