分布式消息队列:RabbitMQ(1)

news2025/1/13 3:09:05

目录

一:中间件

二:分布式消息队列 

2.1:是消息队列

2.1.1:消息队列的优势

2.1.1.1:异步处理化

2.1.1.2:削峰填谷

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

2.2.1.1:数据的持久化

2.2.1.2:可扩展性

2.2.1.3:应用解耦

2.2.1.4:发送订阅 

2.2.2:分布式消息队列的应用场景 

三:Rabbitmq

3.1:基本概念

3.2:快速入门 

3.2.1:引入消息队列Java客户端

3.2.2:单消费开发生产者和消费者

 3.2.3:多消费开发生产者和消费者

3.3.3:交换机

3.3.3.1:交换机的类别

a):fanout


一:中间件

连接多个系统,帮助多个系统紧密协作的技术(组件)

二:分布式消息队列 

2.1:是消息队列

概念:存储消息的队列

关键词:存储,消息,队列

存储:存储数据

消息:某种数据结构,比如l字符串,对象,二进制数据,json等

队列:先进先出的数据结构

作用:在不同的系统下,应用之间实现消息的传输,不需要考虑传输应用的编程语言,系统和,框架等等,实现应用解耦的作用。

        eg:可以让Java开发的应用发消息,让php开发的应用收消息。

 针对生产者来说:不需要关心消费者什么时候接受消息,什么时候消费,我只需要把我的工作完成就好了。生产者和消费者之间实现了解耦。

针对上图,同样我们会发现,当小李要别的书籍的时候,小王也可以将别的书籍放到消息队列中。生产者和消费者从某一种程度上实现了解耦合。

2.1.1:消息队列的优势

2.1.1.1:异步处理化

生产者发送消息之后,可以继续去忙别的,消费者什么时候消费都可以,不产生阻塞。

2.1.1.2:削峰填谷

先把用户的请求放到消息队列种,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取。

举个栗子:

原本:

12点时来了10万个请求,原本情况下,10万个请求都在系统内部立刻处理,很快系统压力过大宕机。

现在:

把10万个请求放到消息队列中,处理系统以自己的恒定速率(比如每秒1个)慢慢执行,稳定处理。

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

分布式消息队列继承于消息队列的优势,并进行了一部分的拓展。

2.2.1.1:数据的持久化

把消息集中存储在硬盘当中,服务器重启就不会丢失。

2.2.1.2:可扩展性

可以根据需求,随时增加(或减少)节点,继续保持稳定的服务。

2.2.1.3:应用解耦

可以连接不同语言(Java,PHP),框架开发的系统,让这些系统读取数据。

示例:

以前的项目:

加了分布式消息队列之后的项目: 

1:一个系统挂了,不影响另一个系统。

2:系统挂了之后并恢复,仍然可以从消息队列中取消息

3:只要发送消息到队列,就可以立即进行返回,不用同步调用所有系统,性能更高

2.2.1.4:发送订阅 

假设情景:当QQ进行了一部分改革之后,其他使用QQ的APP也应该处理
这部分改革。
QQ做了一个情景,要让其他系统知道,比如公告消息。如果QQ一次性给这些应用发消息,所引出的问题如下:
1.每次发通知都要调用很多系统,很麻烦,很可能失败
2.不知道哪个系统需要这些QQ的改革。
解决方案:大的核心系统始终往消息队列发消息,其他的系统都去订阅这个消息队列的消息,用的时候进行取就OK。

2.2.2:分布式消息队列的应用场景 

1:耗时场景。

2:高并发场景。

3:分布式系统的协作。(跨团队,跨业务合作,应用解耦)

4:强稳定的场景(金融业务,持久化,可靠性,削锋填谷)  

三:Rabbitmq

特点:生态好,易学习,易于理解,时效性强,支持不同语言的客户端,扩展性,可用性都很不错。

3.1:基本概念

AMPQ协议:Rabbitmq是遵循AMPQ协议的一种消息中间件。

生产者:发消息到交换机

消费者:收消息的,从某个队列中取消息

交换机(exchange):负责把消息转发到对应的队列

队列(Queue):存储消息的

路由(Rountes):转发,怎么把一个消息从一个地方转发到另一个地方(比如生产者转发到某个队列)

Rabbitmq:端口占用   5672:程序连接的端口 15672:管理界面端口

Rabbitmq的安装:https://blog.csdn.net/qq_25919879/article/details/113055350

管理器页面打不开:http://t.csdnimg.cn/6FqZl

3.2:快速入门 

3.2.1:引入消息队列Java客户端

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.17.0</version>
 </dependency>

3.2.2:单消费开发生产者和消费者

生产者端代码:

public class SingeProducer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             //频道相当于客户端(jdbcClient,redisClient),提供了和消队列server建立通信,程序通过channel进行发送消息
             Channel channel = connection.createChannel()) {
            //创建消息队列,第二个参数(durable):是否开启持久化,第三个参数exclusiove:是否允许当前这个创建消息队列的
            //连接操作消息队列 第四个参数:没有人使用队列,是否需要删除
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //发送消息
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者代码:

public class SingeConsumer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建频道,提供通信
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

 3.2.3:多消费开发生产者和消费者

场景:一个生产者给队列里面发了一条消息,多个消费者进行消费。适用于多个机器同时去接收并处理任务(每个机器处理任务有限)

队列持久化:

durable:

参数设置为true,服务器队列不丢失

 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

 消息持久化:

 指定MessageProperties.PERSISTENY_TEXT_PLAIN参数

    channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));

生产者端代码:

public class MultiProducer {
    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            Scanner scanner=new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }

        }
    }
}

消费者代码: 

在消费者代码中,如何测验一个消费者只能取一个任务,我们利用for循环来进行解决。

指定确认某条消息:

第一个参数:获取消息的信息

第二个参数:如果是true,把所有的历史消息全都确认了。如果为false,取出当前的消息。

   //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
   //如果为false,则为一次性取一个消息
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

指定拒绝某条消息

第一个参数:获取消息的信息

第二个参数:如果是true,则代表是否要拒绝所有的历史消息。

第三个参数:如果是false, 则代表失败的任务是否要重新入队。

  channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
public class MultiConsumer {
    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        for (int i = 0; i <= 2; i++) {
            final Channel channel = connection.createChannel();
            int finalI=i;
            //声明队列
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            //控制单个消费者的任务积压数:每个消费者最多处理一个任务,每个消费者智能处理一个任务
            channel.basicQos(1);
            //处理从队列中取的的消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                try {
                    //处理工作
                    System.out.println(" [x] Received '" +"编号:"+finalI+ message + "'");

                    //停20秒模拟一个机器处理工作能力有限
                    Thread.sleep(20000);
                    //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
                    //如果为false,则为一次性取一个消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            //开启消费监听
            channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });

        }

    }

}

3.3.3:交换机

一个生产者给多个队列发消息,一个生产者对多个队列。交换机:转发功能,怎么把消息转发到不同的队列上。

3.3.3.1:交换机的类别
a):fanout

场景:很适用于发布订阅的场景。

特点:消息会被转发到所有绑定到交换机的队列。

生产者代码:当生产者发送消息后,由交换机放到消息队列中,消费者从消息队列中取。

public class FonoutProducer {

        private static final String EXCHANGE_NAME = "1";

        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //创建交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                Scanner scanner=new Scanner(System.in);
                while(scanner.hasNext()){
                    String message = scanner.nextLine();
                    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                    System.out.println(" [x] Sent '" + message + "'");
                }

            }
        }

}

消费者代码:

public class FonoutConsumer {


    private static final String EXCHANGE_NAME = "1";
    public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            Channel channel2= connection.createChannel();
            //声明交换机
            //创建队列,随机分配一个队列名称
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName="xiaowang";
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName2="xiaoli";
            channel2.queueDeclare(queueName2,true,false,false,null);
            channel2.queueBind(queueName2,EXCHANGE_NAME,"");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [小王] Received '" + message + "'");
            };
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [小李] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        }
    }

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1140956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

墨西哥专线空运载重限制是多少?

在国际贸易中&#xff0c;物流运输是至关重要的一环。对于需要将货物运输至墨西哥的客户来说&#xff0c;了解墨西哥专线空运的载重限制是非常必要的。本文将为您详细解析墨西哥空运的相关知识。 一、墨西哥专线空运简介 墨西哥专线空运是指通过专门的航空公司或货运代理公司&…

SSM咖啡点餐管理系统开发mysql数据库web结构java编程计算机网页源码eclipse项目

一、源码特点 SSM 咖啡点餐管理系统是一套完善的信息系统&#xff0c;结合SSM框架完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主 要采用B/S模式开…

​iOS安全加固方法及实现

目录 iOS安全加固方法及实现 摘要 引言 iOS安全加固方法及实现 一、字符串加密 二、类名方法名混淆 三、程序代码混淆 四、加入安全SDK 总结 参考资料 摘要 本文介绍了iOS平台下的应用安全保护方法&#xff0c;包括字符串加密、类名方法名混淆、程序代码混淆和加入安全…

贪吃蛇-c语言版本

目录 前言 贪吃蛇游戏设计与分析 设计目标&#xff1a; 设计思想&#xff1a; 坐标问题&#xff1a; 字符问题&#xff1a; 小拓展&#xff1a;C语⾔的国际化特性 本地化头文件&#xff1a; 类项 setlocale函数&#xff1a; 宽字符打印&#xff1a; 地图坐标: &am…

如何创建加载项(1)

《VBA高级应用30例》&#xff08;10178985&#xff09;&#xff0c;是我推出的第十套教程&#xff0c;教程是专门针对高级学员在学习VBA过程中提高路途上的案例展开&#xff0c;这套教程案例与理论结合&#xff0c;紧贴“实战”&#xff0c;并做“战术总结”&#xff0c;以便大…

智安网络|揭秘安全测试和渗透测试的异同点

随着云计算的兴起&#xff0c;企业网络安全面临着新的挑战。为了保护企业的核心资产和数据安全&#xff0c;堡垒机作为一种重要的安全设备被广泛应用。传统的堡垒机在过去几十年中发挥了重要作用&#xff0c;但如今&#xff0c;随着云堡垒机的出现&#xff0c;企业在选择合适的…

BLE基础

文章是视频笔记 蓝牙广播 教程讲义 ESP32之低功耗蓝牙&#xff08;BLE&#xff09;-小鱼创意 蓝牙信道 BLE&#xff08;Bluetooth Low Energy&#xff09;广播使用的是2.4GHz ISM频段&#xff0c;其中包含了40个信道。在BLE广播中&#xff0c;主要使用的是3个不相邻的信道&…

2023年第四届MathorCup大数据竞赛(A题)|坑洼道路检测和识别|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2021年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 希望这些想法对大家的做题有一定的启发和借鉴意义。 让我们来…

普及篇|云备份和云容灾,你用对了吗?

企业常常会因为自然灾害、硬件老化故障、人为误操作等问题导致业务系统中断&#xff0c;该如何进行安全、高效的数据保护&#xff0c;这一问题引起企业越来越多的高度关注。 而对比传统的自建机房对数据进行保护&#xff0c;在云端利用云计算的弹性伸缩、按需扩展特点&#xff…

公众号迁移如何线上办理公证?

公众号账号迁移的作用是什么&#xff1f;只能变更主体吗&#xff1f;1.可合并多个公众号的粉丝、文章&#xff0c;打造超级大V2.可变更公众号主体&#xff0c;更改公众号名称&#xff0c;变更公众号类型——订阅号、服务号随意切换3.可以增加留言功能4.个人订阅号可迁移到企业名…

实现寄生组合继承

寄生组合继承是一种继承方式&#xff0c;它通过组合使用构造函数继承和原型继承的方式&#xff0c;实现了高效而且正确的继承方式。 具体实现步骤如下&#xff1a; ① 定义一个父类&#xff0c;实现其属性和方法&#xff1a; function Person(name) {this.name namethis.age…

Google play 应用下架、封号常见原因:8.3/10.3分发协议及恶意软件政策问题浅析

相信大多数谷歌Android开发者都遭遇过应用下架、账号被封的情况&#xff0c;尤其对于想通过上传马甲包、矩阵方式来获得更多收益的开发者来说&#xff0c;想必应用下架、拒审、账号被封已经是家常便饭了&#xff0c;同时也为此烦恼。 造成这种情况的原因有很多&#xff0c;且每…

腾讯云新用户优惠券领取方法及使用教程

腾讯云作为国内领先的云计算服务提供商&#xff0c;为了吸引更多的新用户&#xff0c;经常会推出各种优惠活动。其中&#xff0c;最吸引新用户的还是腾讯云优惠券&#xff0c;本文将详细介绍腾讯云新用户优惠券的领取方法及使用教程&#xff0c;助力大家轻松上云&#xff01; 一…

特殊类设计(只在堆/栈上创建对象,单例模式),完整版代码+思路

目录 类不能被拷贝 类不能被继承 只在堆上创建对象 只在栈上创建对象 operator new operator delete 只能创建一个对象 设计模式 介绍 常见的设计模式 单例模式 介绍 应用 饿汉模式 介绍 实现 思路 代码 使用 懒汉模式 引入 介绍 实现 思路 代码 使用…

【Javascript】编写⼀个函数,排列任意元素个数的数字数组,按从⼩到⼤顺序输出

目录 sort方法 两个for循环 写法一&#xff1a; 写法二&#xff1a; sort方法 var list[3,6,2,8,1,7];list.sort();console.log(list);使用sort方法有局限&#xff0c;适合元素为个位数 var list[3,6,80,100,78,4];list.sort();console.log(list);如果元素 解决方法&#xf…

VR数字党建:红色文化展厅和爱国主义教育线上线下联动

伴随着党建思想的加深&#xff0c;很多政府单位都有打造VR党建展厅的想法&#xff0c;而党建基地也是激发爱国热情、凝聚人民力量、培养民族精神的重要场所。现如今&#xff0c;伴随着5G、VR等技术的成熟&#xff0c;VR数字党建积极推动运用VR技术&#xff0c;推动红色文化展厅…

linux 实时调度实现

调度入口__schedule() 主要做了几件事: deactivate_task() -> pick_next_task() -> context_switch() pick_next_task 的实现中主要两个步骤: (IN __pick_next_task)put_prev_task_balance(rq, prev, rf);for_each_class(class) {p class->pick_next_task(rq);if…

Java 将数据导出到Excel并发送到在线文档

一、需求 现将列表数据&#xff0c;导出到excel,并将文件发送到在线文档&#xff0c;摒弃了以往的直接在前端下载的老旧模式。 二、pom依赖 <!-- redission --><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-…

AI 引擎系列 1 - 从 AI 引擎工具开始(2022.1 更新)

AI 引擎系列 1 - 从 AI 引擎工具开始&#xff08;2022.1 更新&#xff09; AI 引擎系列简介 在这篇题为 Versal 自适应 SoC AI 引擎入门的文章中&#xff0c;我介绍了一些 Versal™ 自适应 SoC 器件中存在的 AI 引擎 (AIE) 阵列。本系列是全新的 AI 引擎系列博文&#xff0c;我…

LeetCode--2.两数相加

文章目录 1 题目描述2 解题思路2.1 代码实现 1 题目描述 给你两个 非空 的链表, 表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的, 并且每个节点只能存储 一位 数字 请你将两个数相加, 并以相同形式返回一个表示和的链表 你可以假设除了数字 0 之外, 这两个数都…