rabbitmq入门学习

news2024/11/24 5:48:02

写在前面

本文看下rabbit mq的基础概念以及使用。

1:简单介绍

为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的,即只能通过Java语言来实现和使用,与jms类似还有基于net的nms,这也是一套规范接口,只不过是基于.net开发语言的。不管是jms还是nms,它们都有一个通病,就是无法实现跨语言,这个时候amqp就应用而生了,可以将其理解为一种应用层的协议,构建在tcp之上,因此就可以实现跨语言的消息通信,参考下图:
在这里插入图片描述

amqp协议通信模型如下:
在这里插入图片描述

2:基础环境准备

2.1:服务安装

参考docker安装rabbitmq 。

2.2:创建Virtual host和用户

  • 创建virtual host
    在这里插入图片描述
  • 创建admin用户
    在这里插入图片描述
  • 设置admin权限
    在这里插入图片描述
    添加成功:
    在这里插入图片描述

3:正戏

本文主要看其提供的5种队列,如下图:
在这里插入图片描述

3.1:简单队列

简单队列就是一个生产者一个消费者的队列方式,如下图:
在这里插入图片描述

  • 生产消息:
public class SimpleSend {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World come here!!!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

生产后:
在这里插入图片描述
点击q_test_01:
在这里插入图片描述

  • 消费消息
public class SimpleRecv {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列 true 这里使用自动确认,及消息消费默认消费成功,这种方式效率高,但是容易丢失消息
        // 如果某些场景允许部分消息丢失,但是要求执行效率,则可以考虑将该值设置为true,否则设置为false,即手动确认
        // 最周通过执行channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);完成手动确认,多一次网络通信
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

输出:

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---
 [x] Received 'Hello World come here!!!'

在这里插入图片描述

3.2:work队列

一个生产者多个消费者,如下图:
在这里插入图片描述

work队列看起来和简单队列相比只是多起了几个消费者而已。

  • 生产者
public class WorkSend {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}
  • 消费者
public class WorkRecv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        // 该值设置为1,配合手动确认,则可以实现一条一条消费,确认一条后消费下一条,且多个消费者时谁的消费能力强,谁消费的消息多
        // 消费者之间消费消息不相互影响
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,false表示手动返回完成状态,true表示自动
        channel.basicConsume(QUEUE_NAME, false, consumer);
//        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态,注释掉表示使用自动确认模式
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.3:订阅模式

这种这种模式可以实现一个消息同时被多个消费者消费(广播),但是具体的实现需要依赖于交换器exchange,生产者端将消息发送到交换器,之后消费者端只需要将某个消息队列绑定到交换器,交换器会将消息发送到绑定的所有队列,消费者端就可以从队列中获取到对应的消息,但需要注意一个队列的消息还是只可以获取一次,如下图们:
在这里插入图片描述

  • 生产者端
public class SubscribeSend {

    private final static String EXCHANGE_NAME = "test_exchange_fanout111";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}

消费者端2个

public class SubscribeRecv {

    private final static String QUEUE_NAME = "test_queue_work1";
    private final static String EXCHANGE_NAME = "test_exchange_fanout111";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

public class SubscribeRecv2 {

    private final static String QUEUE_NAME = "test_queue_work2";
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
  • 启动2个消费者
    在这里插入图片描述

  • 启动生产者
    在这里插入图片描述

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---
 [Recv2] Received 'Hello World!'

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---
 [Recv] Received 'Hello World!'

接着看下转换器和队列的绑定关系:
在这里插入图片描述

注意以下的问题:

1:因为交换器本身不具备数据存储的能力,所以如果是某个交换器上没有绑定任何的队列,则该消息将会丢失。
2:因为交换器本身不存储数据,所以在具体的消息队列绑定到交换器(即消费者启动前),不要生产消息到交换器,否则消息将会丢失。

3.4:路由模式

这种方式类似于订阅模式,也需要转换器作为中间商,但是并不会直接无脑的发送消息,而是会根据消费者额外指定的路由key,生产者在向转换器发送消息时会带着routeKey,消费者在消费消息时会指定自己期望的routeKey只有二者匹配时,才会从队列中消费对应的消息,l另外注意的时交换机类型设置为direct,如下图:
在这里插入图片描述

  • 生产者
public class RoutingSend {

    private final static String EXCHANGE_NAME = "test_exchange_direct123";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange 设置交换器为direct,即路由模式
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息内容
        String message = "Hello World!";
        // 设置路由key为"update",即只有对应队列上设置了update路由key的消费者才会消费到消息
        channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}
  • 2个消费者
public class RoutingRecv {

    private final static String QUEUE_NAME = "test_queue_work_route";
    private final static String EXCHANGE_NAME = "test_exchange_direct123";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机 设置路由key为delete,update
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

public class RoutingRecv2 {

    private final static String QUEUE_NAME = "test_queue_work_route2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

在这里插入图片描述
在这里插入图片描述
先启动消费者再启动生产者测试可。
在这里插入图片描述

写在后面

参考文章列表

RabbitMQ使用教程(超详细)

docker安装rabbitmq

RabbitMq的一些概念,JMS、AMQP、MQ 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1184201.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Xcode15更新内容

参考博客&#xff1a; 【WWDC 2023】Xcode 15 更新内容 文章目录 1. xcode15起&#xff0c;项目内创建的图片可以使用点语法访问2.2. UIKit项目也可以使用预览功能3. Xcode新增标签功能4.Log分类 1. xcode15起&#xff0c;项目内创建的图片可以使用点语法访问 2.2. UIKit项目也…

台达PLC和触摸屏之间无线MODBUS通讯

今天&#xff0c;我们来一起学下下台达PLC与触摸屏之间无线通信的实现方法。其中触摸屏作为主站&#xff0c;台达PLC作为从站&#xff0c;并采用日系PLC专用无线通讯终端DTD435MC&#xff0c;作为实现无线通讯的硬件设备。 台达PLC和触摸屏通讯关键是对于通讯参数的设置。 触…

C++入门学习(4)引用 (讲解拿指针比较)

上期回顾 在学习完函数重载之后&#xff0c;我们可以使用多个重名函数进行操作&#xff0c;会发现C真的是弥补了好多C语言的不足之处&#xff0c;真的不禁感概一下&#xff0c;时代的进步是需要人去做出改变的&#xff0c;而不是一味的使用啊&#xff01;所以我们今天继续学一下…

从白日梦到现实:推出 Elastic 的管道查询语言 ES|QL

作者&#xff1a;George Kobar, Bahubali Shetti, Mark Settle 今天&#xff0c;我们很高兴地宣布 Elastic 的新管道查询语言 ES|QL&#xff08;Elasticsearch 查询语言&#xff09;的技术预览版&#xff0c;它可以转换、丰富和简化数据调查。 ES|QL 由新的查询引擎提供支持&am…

企业如何通过CRM系统赢得客户?

在CRM客户管理系统中&#xff0c;我们可以将培养客户关系简单理解为提供良好的客户体验。这对于企业来说非常重要&#xff0c;不仅可以赢得客户实现成交&#xff0c;更可以塑造口碑&#xff0c;带来更多的新客户。下面我们说说&#xff0c;如何通过CRM系统快速赢得客户&#xf…

【最新版】ChatGPT付费创作系统V2.4.9独立版 +WEB端+ H5端 + 小程序端(支持分享朋友圈)

人类小徐提供的GPT付费体验系统最新版系统是一款基于ThinkPHP框架开发的AI问答小程序&#xff0c;是基于国外很火的ChatGPT进行开发的Ai智能问答小程序。当前全民热议ChatGPT&#xff0c;流量超级大&#xff0c;引流不要太简单&#xff01;一键下单即可拥有自己的GPT&#xff0…

antdv使用a-cascader联级选择器实现自定义浮层样式

一般的使用组件库想要自定义样式都会使用深度选择器deep去实现 但是有的组件不管是deep还是!important还是写行内样式都改不掉 这里主要讲使用a-cascader联级选择器的浮层改变样式 一&#xff0c;使用组件 <a-cascader:options"regionOptions"change-on-selectv…

Valve 近日又为所有支持平台发布了新的 Steam 客户端更新

导读继发布 SteamVR 2.0 之后&#xff0c;Valve 近日又为所有支持平台发布了新的 Steam 客户端更新&#xff0c;其中引入了多项新功能、改进和错误修复&#xff0c;为玩家提供最佳的 Linux 游戏体验。 对于 Linux 玩家来说&#xff0c;新的 Steam Client 更新包括 64 位 openvr…

【SWAT】SWAT-CUP动态基流分割相关说明

说明 SWAT不会在输出.rch文件中打印基流和侧向流。相反,它将它们打印在输出.sub文件中。为了获得基流时间序列,必须从输出中筛选出观测断面上游的所有子流域,必须计算其面积加权平均值(+从mm到m3/s的额外单位转换)。SWAT-CUP动态基流分割并没有计算基流,而是根据提供的动…

JS操作字符串常见方法

目录 一&#xff1a;前言 二&#xff1a;常见的内置方法 1、charAt与charCodeAt 2、indexOf与lastIndexOf 3、substring与substr 4、toLowerCase 和 toUpperCase 5、slice 6、replace 7、split 8、concat 9、trim 10、trimStart / trimLeft 11、trimEnd / trimRigh…

机器学习——逻辑回归

一、分类问题 监督学习的最主要类型 分类&#xff08;Classification&#xff09;&#xff1a; 身高1.85m&#xff0c;体重100kg的男人穿什么尺码的T恤&#xff1f;根据肿瘤的体积、患者的年龄来判断良性或恶性&#xff1f;根据用户的年龄、职业、存款数量来判断信用卡是否会…

【AWS系列】使用 Amazon SageMaker 微调和部署 ChatGLM 模型

前言 大语言模型是一种基于深度学习技术的人工智能模型&#xff0c;可以追溯到早期的语言模型和机器翻译系统。直到最近&#xff0c;随着深度学习技术的崛起&#xff0c;大型预训练语言模型才开始引起广泛的关注。 大语言模型使用大规模的文本数据集进行预训练&#xff0c;从而…

【VSCode】VSCode自定义代码编辑区背景色

// A code block { "editor.fontSize": 16, "editor.mouseWheelZoom": true, "editor.tabSize": 2, "workbench.colorCustomizations": { // 写在 Atom One Light 里面则只对该主题有效 "[Atom One Light]"…

GreenPlum简介

简介 Greenplum是一家总部位于**美国加利福尼亚州&#xff0c;为全球大型企业用户提供新型企业级数据仓库(EDW)、企业级数据云(EDC)和商务智能(BI)提供解决方案和咨询服务的公司&#xff0c;在全球已有&#xff1a;纳斯达克&#xff0c;纽约证券交易所&#xff0c;Skype. FOX&…

第四章:java关键字super

系列文章目录 文章目录 系列文章目录前言一、super关键字二、super 和 this 的比较总结 前言 super关键字可以用于对象访问父类成员。 一、super关键字 super 代表父类的引用&#xff0c; 用于访问父类的属性、 方法、 构造器。 super.属性名 //访问父类的属性&#xff0c;不…

2003-2022年高铁数据高铁开通时间数据

2003-2022年高铁数据高铁开通时间数据 1、时间&#xff1a;2003-2022年 2、指标&#xff1a;高铁站名称、开通时间、所在省份、所在城市、所属线路名称、以及相关备注 3、指标说明&#xff1a; Hsrwsnm[高铁站名称]-高铁站名称 Optm[开通时间]-高铁站开通的时间 Prvn[所在…

java传base64返回给数据报404踩坑

一、问题复现 1.可能因为base64字符太长&#xff0c;导致后端处理时出错&#xff0c;表现为前端请求报400错误&#xff1b; 这一步debug进去发现base64数据是正常传值的 所以排除掉不是后端问题,但是看了下前端请求,猜测可能是转换base64时间太长数据过大导致的404 2.前端传…

【C++】——基础编程

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

牵手世界顶尖科学家论坛,五粮液扩大国际高端平台布局

执笔 | 尼 奥 编辑 | 扬 灵 11月6日&#xff0c;以“科学引领变革 重塑世界韧性”为主题的第六届世界顶尖科学家论坛&#xff08;以下简称“顶科论坛”&#xff09;在上海召开。来自25个国家和地区&#xff0c;包括27位诺奖得主在内的100余位海外顶尖科学家、40余位中国两…

OpenCV(opencv_apps)在ROS中的视频图像的应用(重点讲解哈里斯角点的检测)

1、引言 通过opencv_apps&#xff0c;你可以在ROS中以最简单的方式运行OpenCV提供的许多功能&#xff0c;也就是说&#xff0c;运行一个与功能相对应的launch启动文件&#xff0c;就可以跳过为OpenCV的许多功能编写OpenCV应用程序代码&#xff0c;非常的方便。 对于想熟悉每个…