RabbitMQ入门实战

news2024/11/29 12:35:24

文章目录

  • RabbitMQ入门实战
    • 基本概念
    • 安装
    • 快速入门
      • 单向发送
      • 多消费者

RabbitMQ入门实战

官方:https://www.rabbitmq.com

基本概念

AMQP协议:https://www.rabbitmq.com/tutorials/amqp-concepts.html
定义:高级信息队列协议(Advanced Message Queue Protocol)
生产者:发消息到某个交换机
消费者:从某个队列中取信息
交换机(Exchange):负责把消息转发到对应的队列
队列(Queue):存储消息
路由(Routes):将消息从一个地方转发到另一个地方
AMQP模型
image.png

安装

windows 安装:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.12.0image.png
安装 erlang 25.3.2(因为 RabbitMQ 依赖 erlang,不安装这个安装RabbitMQ会报错),这个语言的性能非常高。
erlang 下载:https://www.erlang.org/patches/otp-25.3.2
image.png
安装完 erlang 后,安装 rabbitmq 即可。
win + r 打开 services.msc(服务菜单),查看 rabbitmq 服务是否已启动:


安装 rabbitmq 监控面板:
在 rabbitmq 安装目录的 sbin 中执行下述脚本:
D:\software\rabbitmq\rabbitmq_server-3.12.0\sbin

rabbitmq-plugins.bat enable rabbitmq_management

image.png
访问:http://localhost:15672,用户名密码都是 guest:
image.png
如果想要在远程服务器安装访问 rabbitmq 管理面板,你要自己创建一个管理员账号,不能用默认的 guest,否则会被拦截(官方出于安全考虑)。

如果被拦截,可以自己创建管理员用户:
参考文档的 Adding a User:https://www.rabbitmq.com/access-control.html

rabbitmq 端口占用:
5672:程序连接的端口
15672:webUI

快速入门

单向发送

一个生产者给一个队列发消息,一个消费者从这个队列取消息,一对一

引入消息队列 Java 客户端:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.17.0</version>
</dependency>

生产者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

Channel频道:理解为操作消息队列的client,提供了和消息队列server建立通信的方法(为了复用连接,提高传输效率)。程序通过channel操作rabbitmq
创建消息队列:
参数:
queueName:消息队列名称(注意,同名称的消息队列,只能用同样的参数创建一次)
durabale:消息队列重启后,消息是否丢失
exclusive:是否只允许当前这个创建消息队列的连接操作消息队列
autoDelete:没有人用队列后,是否要删除队列
执行程序后,可以看到有 1 条消息:

消费者代码:

public class SingleConsumer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定义了如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 消费消息,会持续阻塞
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

启动消费者后,可以看到消息被消费了

多消费者

官方教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
场景:多个机器同时去接受并处理任务(尤其是每个机器的处理能力有限)
一个生产者给一个队列发消息,多个消费者从这个队列中取消息

1)队列持久化
durable参数设置为true,服务器重启后队列不丢失:

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

2)消息持久化
指定MessageProperties.PERSISTENT_TEXT_PLAIN 参数:

channel.basicPublish("", TASK_QUEUE_NAME,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes("UTF-8"));

生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

public class MultiProducer {
    //队列名字
  private static final String TASK_QUEUE_NAME = "multi_queue";

  public static void main(String[] argv) throws Exception {
      //建立连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        //创建队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.nextLine();
            //在指定的交换机上发布消息到TASK_QUEUE_NAME的队列中,使用 MessageProperties.PERSISTENT_TEXT_PLAIN将消息标为持久化,最后将消息体转换为字节数组
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
  }

}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class MultiConsumer {

    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        // 建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        for (int i = 0; i < 2; i++) {
            final Channel channel = connection.createChannel();

            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            //消费者每次只能处理一个消息
            channel.basicQos(1);

            // 定义了如何处理消息
            int finalI = i;
            // 创建消息接收回调函数,以便接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                try {
                    // 处理工作
                    System.out.println(" [x] Received '" + "编号:" + finalI + ":" + message + "'");
                    // 停 20 秒,模拟机器处理能力有限
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    // 手动发送应答,告诉RabbitMQ消息已经被处理
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 开始消费消息,传入队列名称,是否自动确认,投递回调和消费者取消回调
            channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
}

这样你就可以跑通你的第一个RabbitMQ并且了解了单向发送和多消费者两种方式,下期分享RabbitMQ一个重要的概念—交换机

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1598962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LangChain入门:19.探索结构化工具对话

引言 在人工智能的浪潮中&#xff0c;对话代理技术正逐渐成为企业和开发者关注的焦点。LangChain&#xff0c;作为对话代理领域的一颗新星&#xff0c;自2021年9月诞生以来&#xff0c;以其强大的功能和灵活的应用场景迅速赢得了市场的认可。本文将带你深入了解LangChain中的S…

【从浅学到熟知Linux】进程控制上篇=>进程创建、进程终止与进程等待(含_exit与exit的区别、fork函数详解、wait与waitpid详解)

&#x1f3e0;关于专栏&#xff1a;Linux的浅学到熟知专栏用于记录Linux系统编程、网络编程等内容。 &#x1f3af;每天努力一点点&#xff0c;技术变化看得见 文章目录 进程创建fork函数写时拷贝 进程退出进程退出操作系统做了什么&#xff1f;进程退出场景进程退出的常见方法…

openstack修改实例名称但是gnocchi监控数据中实例名称没有变更的问题处理

文章目录 一、问题描述二、调研过程1、变更实例名称2、查看grafana中的监控数据3、libvirt服务中的xml文件4、现有的监控数据流转架构 总结 一、问题描述 openstack修改实例名称但是gnocchi监控数据中实例名称没有变更的问题处理。 通过修改实例名称的功能修改了实例名称&…

大模型赋能:爬虫技术的全新革命

大模型加持下的爬虫技术革新&#xff1a;从BS4到提示工程的飞跃 在爬虫技术的演进历程中&#xff0c;内容解析一直是一个核心环节。传统的爬虫技术&#xff0c;如使用BeautifulSoup&#xff08;BS4&#xff09;等工具&#xff0c;需要逐个解析网页内容&#xff0c;通过XPath或C…

XILINX 7系列时钟资源

文章目录 前言一、时钟概要1.1、CC1.2、BUFR、BUFIO、BUFMR1.3、CMT1.4、BUFH1.5、BUFG 二、时钟路由资源三、CMT 前言 本文主要参考xilinx手册ug472 一、时钟概要 7系列FPGA时钟资源主要有CC、BUFR、BUFIO、BUFMR、CMT、BUFG、BUFH和GTE_COMMON 1.1、CC “CC”&#xff0…

OpenHarmony开发案例:【自定义通知】

介绍 本示例使用[ohos.notificationManager] 等接口&#xff0c;展示了如何初始化不同类型通知的通知内容以及通知的发布、取消及桌面角标的设置&#xff0c;通知类型包括基本类型、长文本类型、多行文本类型、图片类型、带按钮的通知、点击可跳转到应用的通知。 效果预览&am…

TensorFlow实战Google深度学习框架 PDF书籍分享

今天又来给大家推荐一本TensorFlow方面的书籍<TensorFlow实战Google深度学习框架>。本书适用于想要使用深度学习或TensorFlow的数据科学家、工程师&#xff0c;希望了解大数据平台工程师&#xff0c;对人工智能、深度学习感兴趣的计算机相关从业人员及在校学生等。 下载当…

【数据结构与算法】用两个栈实现一个队列

题目 用两个栈&#xff0c;实现一个队列功能 add delete length 队列 用数组可以实现队列&#xff0c;数组和队列的区别是&#xff1a;队列是逻辑结构是一个抽象模型&#xff0c;简单地可以用数组、链表实现&#xff0c;所以数组和链表是一个物理结构&#xff0c;队列是一个逻…

Cannot access ‘androidx.activity.FullyDrawnReporterOwner‘

Android Studio新建项目就报错&#xff1a; Cannot access ‘androidx.activity.FullyDrawnReporterOwner’ which is a supertype of ‘cn.dazhou.osddemo.MainActivity’. Check your module classpath for missing or conflicting dependencies 整个类都报错了。本来原来一直…

阿里面试:DDD中的实体、值对象有什么区别?

在领域驱动设计&#xff08;DDD&#xff09;中&#xff0c;有两个基础概念&#xff1a;实体&#xff08;Entity&#xff09;和值对象&#xff08;Value Object&#xff09;。 使用这些概念&#xff0c;我们可以把复杂的业务需求映射成简单、明确的数据模型。正确使用实体和值对…

【环境】原则

系列文章目录 【引论一】项目管理的意义 【引论二】项目管理的逻辑 【环境】概述 【环境】原则 一、培养项目系统性思维 1.1 系统性思维 1.2 系统性思维的价值 1.3 建模和推演&数字孪生 二、项目的复杂性和如何驾驭复杂性 2.1 复杂性的三个维度 2.2 如何驾驭复杂性 三、…

【御控物联】Java JSON结构转换(4):对象To对象——规则属性重组

文章目录 一、JSON结构转换是什么&#xff1f;二、术语解释三、案例之《JSON对象 To JSON对象》四、代码实现五、在线转换工具六、技术资料 一、JSON结构转换是什么&#xff1f; JSON结构转换指的是将一个JSON对象或JSON数组按照一定规则进行重组、筛选、映射或转换&#xff0…

谷歌pixel6/7pro等手机WiFi不能上网,显示网络连接受限

近期在项目中遇到一个机型出现的问题,先对项目代码进行排查,发现别的设备都能正常运行,就开始来排查机型的问题,特意写出来方便后续查看,也方便其它开发者来自查。 设备机型:Pixel 6a 设备安卓版本:13 该方法无需root,只需要电脑设备安装adb(即Android Debug Bridge…

GPT提示词分享 —— 解梦

&#x1f449; 对你描述的梦境进行解读。 我希望你能充当一个解梦者。我将给你描述我的梦&#xff0c;而你将根据梦中出现的符号和主题提供解释。不要提供关于梦者的个人意见或假设。只提供基于所给信息的事实性解释。 GPT3.5的回答 GPT3.5 &#x1f447; 感觉有点傻&#xf…

申请免费https证书

https证书是什么&#xff1a; https证书是指在http超文本传输协议的前提下安装部署了SSL/TLS证书后形成的全新协议&#xff0c;https安全超文本传输协议。在https证书部署完成后&#xff0c;服务器端和浏览器端进行的信息交互的过程中会有加密层保护&#xff0c;使得原本明文传…

群晖NAS安装Synology Office与Drive结合内网穿透实现本地文件公网分享与远程协作

文章目录 本教程解决的问题是&#xff1a;1. 本地环境配置2. 制作本地分享链接3. 制作公网访问链接4. 公网ip地址访问您的分享相册5. 制作固定公网访问链接 本教程解决的问题是&#xff1a; 1.Word&#xff0c;PPT&#xff0c;Excel等重要文件存在本地环境&#xff0c;如何在编…

【解决】Spring Boot创建项目常见问题

&#x1f3a5; 个人主页&#xff1a;Dikz12&#x1f525;个人专栏&#xff1a;Spring学习之路&#x1f4d5;格言&#xff1a;吾愚多不敏&#xff0c;而愿加学欢迎大家&#x1f44d;点赞✍评论⭐收藏 目录 idea无maven选项 无效发行版17 类⽂件具有错误的版本 61.0, 应为 …

H5获取微信openid封装方法

H5获取微信openid封装方法 目录1、前置配置条件2、封装并新建getOpenid.js文件2.1&#xff1a;处理code方法2.2&#xff1a;第一次获取到openid后&#xff0c;再次进入无需再次获取&#xff1b;2.3&#xff1a;页面调用方法 3、往期回顾总结&#xff1a; 目录 接到需求&#xf…

腾讯EdgeOne产品测评体验—Web安全的攻与防:云端防护一体化

目录 简介接入准备EdgeOne购买及接入服务器环境配置添加测试站点关闭防护 安全性能测试XSS攻击sql注入 站点加速测试代码测试通过在线工具对比测试Ping检测tcping网站测速 HTTPS证书 操作步骤优点 总结EdgeOne的优缺点 简介 EdgeOne&#xff0c;作为腾讯云推出的全新CDN解决方…

使用icpc tool进行滚榜操作

前言 参加ACM的同学都知道&#xff0c;比赛非常有趣的环节就是赛后的滚榜环节&#xff0c;所以为了一个比赛的完整性&#xff0c;自己办比赛时也想要加入滚榜的操作&#xff0c;经过一段时间的研究学习&#xff0c;已经可以将滚榜程序与domjudege程序成功完成融合&#xff0c;…