RabbitMQ的常见工作模式

news2024/11/20 1:52:44

Work queues 工作队列模式

模式说明

通过Helloworld工程我们已经能够构建一个简单的消息队列的基本项目,项目中存在几个角色:生产 者、消费者、队列,而对于我们真实的开发中 ,对于消息的消费者通过是有多个的。

比如在实现用户注册功能时,用户注册成功,会给响对应用户发送邮件,同时给用户发送手机短信,告诉用户已成功注册网站或者app 应用,这种功能在大部分项目开发中都比较常见 ,而对于helloworld 的应用中虽然能够对 消息进行消费,但是有很大问题: 消息消费者只有一个,当消息量非常大时,单个消费者处理消息就会变得很慢,同时给节点页带来很大压力,导致消息堆积越来越多。对于这种情况,RabbitMQ 提供了工作 队列模式,通过工作队列提供做个消费者,对MQ产生的消息进行消费,提高MQ消息的吞吐率,降低消息的处理时间。处理模型图如下。

在这里插入图片描述

实现步骤

生产者

package cn.wolfcode.java.rabbitmq._02worker;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * Created by wolfcode-fanjialong
 */

public class NewTask {
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.142.129");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            String message = "tasequeue";
            for(int i=0;i<20;i++){
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,

                        (message+i).getBytes("UTF-8"));

            }
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者

package cn.wolfcode.java.rabbitmq._02worker;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;

/**

 * Created by wolfcode-fanjialong

 */

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.142.129");
        final Connection connection = factory.newConnection;
        final Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //channel.basicQos(1);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }

            System.out.println(" [x] Received '" + message + "'");
        };

        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }

}
简单问题说明

从结果可以看出消息被平均分配到两个消费方,来对消息进行处理,提高了消息处理效率,创建多个消费者来对消息进行处理。这里RabitMQ采用轮询来对消息进行分发时保证了消息被平均分配到每个消费方 。

但是引入新的问题:真正的生产环境下,对于消息的处理基本不会像我们现在看到的这样,每个消 费方处理的消息数量是平均分配的,比如因为网络原因,机器cpu ,内存等硬问题,消费方处理消息时 同类消息不同机器进行处理时消耗时间也是不一样的,比如1号消费者消费1条消息时1秒,2号消费者消费1条消息是5秒,对于1号消费者比2号消费者处理消息快,那么在分配消息时就应该让1号消费者多收 到消息进行处理,也即是我们通常所说的”能者多劳”,同样Rabbitmq对于这种消息分配模式提供了支持。

问题: 任务量很大,消息虽然得到了及时的消费,单位时间内消息处理速度加快,提高了吞吐量,可 是不同消费者处理消息的时间不同,导致部分消费者的资源被浪费。

解决:采用消息公平分发。

总结:工作队列消息轮询分发消费者收到的消息数量平均分配,单位时间内消息处理速度加快,提高了吞吐量。

工作模式队列-消息公平分发(fair dispatch)

在案例01中对于消息分发采用的是默认轮询分发,消息应答采用的自动应答模式,这是因为当消息进 入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。

为了解决这个问题,我们使用 basicQos(prefetchCount = 1) 方法,来限RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。执行模型图如下:

在这里插入图片描述

Pub/Sub 订阅模式

模式说明

在这里插入图片描述

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、

递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列

  • Direct:定向,把消息交给符合指定routing key 的队列

  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

实现步骤

生产者

package cn.wolfcode.java.rabbitmq._03pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Created by wolfcode-fanjialong
 */
public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message = "info: Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者

package cn.wolfcode.java.rabbitmq._03pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * Created by wolfcode-fanjialong
 */

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

Routing 路由模式

模式说明

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
在这里插入图片描述

图解

P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key

X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列

C1:消费者,其所在队列指定了需要 routing key 为 error 的消息

C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

实现步骤

生产者

package cn.wolfcode.java.rabbitmq._04rounting;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionF![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/6bffe712644346098952dba200523856.png)
actory;
/**
 * Created by wolfcode-fanjialong
 */

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String severity = "info";
            String message = "directMsg";
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }
}

消费者

package cn.wolfcode.java.rabbitmq._04rounting;

import com.rabbitmq.client.*;

/**

 * Created by wolfcode-fanjialong

 */

public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }

}

Topic 模式

模式介绍

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

在这里插入图片描述

图解:

红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

实现步骤

生产者

package cn.wolfcode.java.rabbitmq._05topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**

 * Created by wolfcode-fanjialong

 */

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String routingKey = "order1.save";
            String message = "topicMsg";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }

    }

}

消费者

package cn.wolfcode.java.rabbitmq._05topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**

 * Created by wolfcode-fanjialong

 */

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "order.*");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1478514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【GameFramework框架内置模块】7、事件(Event)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 【GameFramework框架】系列教程目录&#xff1a; https://blog.csdn.net/q7…

金三银四,自动化测试面试题精选【美团二面】

面试一般分为技术面和hr面&#xff0c;形式的话很少有群面&#xff0c;少部分企业可能会有一个交叉面&#xff0c;不过总的来说&#xff0c;技术面基本就是考察你的专业技术水平的&#xff0c;hr面的话主要是看这个人的综合素质以及家庭情况符不符合公司要求&#xff0c;一般来…

Ps:明度直方图

明度 Luminosity直方图显示了图像中各个亮度级别的像素分布情况。 与 RGB 直方图不同&#xff0c;“明度”直方图专注于图像的亮度信息&#xff0c;而不是单独的颜色信息。 在“直方图”面板的通道中选择“明度”。 “明度”直方图提供了一种量化的方式来理解图像的整体明暗结构…

项目技术栈-解决方案-消息队列

项目技术栈-解决方案-消息队列 概念应用场景1. 异步处理 参考文章消息队列&#xff08;Message Queue&#xff09; 概念 “消息”是在两台计算机间传送的数据单位。 消息可以非常简单&#xff0c;例如只包含文本字符串&#xff1b; 也可以更复杂 &#xff0c;包括对象等。 队…

视频生成模型Sora的全面解析:从AI绘画、ViT到ViViT、DiT、VDT、NaViT、VideoPoet

前言 真没想到&#xff0c;距离视频生成上一轮的集中爆发(详见《Sora之前的视频生成发展史&#xff1a;从Gen2、Emu Video到PixelDance、SVD、Pika 1.0》)才过去三个月&#xff0c;没想OpenAI一出手&#xff0c;该领域又直接变天了 自打2.16日OpenAI发布sora以来(其开发团队包…

农业四情监测设备为什么符合高标准农田建设

TH-Q3随着科技的不断进步&#xff0c;智慧农业正逐渐成为现代农业发展的重要方向。其中&#xff0c;农业四情监测系统以其独特的功能和优势&#xff0c;在高标准农田建设中发挥着越来越重要的作用。 一、农业四情监测系统的概念及功能 农业四情监测系统&#xff0c;顾名思义&am…

一道题目总结出一个模版(简单记录一下,感觉挺有用的)

代码如下 using ll long long; int main() {ll n, m,ans0,i;std::cin >> n >> m;std::vector<ll>a(m1);for (int i 1; i < m; i) {std::cin >> a[i];a[i] a[i - 1];}//如果m<n,那么只够写第一篇文章ans a[1] * std::min(m,n);for (i n; i …

开源项目:图像分类技术在医疗影像分析中的应用与实践

一、引言 在当今快速发展的医疗行业中&#xff0c;数字医疗正逐渐成为提升医疗服务质量和效率的关键力量。本项目旨在通过整合医药电商、远程问诊、慢病管理等多维度服务&#xff0c;为消费者和企业提供全面的医疗解决方案。项目的核心在于运用先进的图像分类技术&#xff0c;以…

sql注入less46作业三

采用报错注入 updatexml(XML_document,XPath_string,new_value) 一共可以接收三个参数&#xff0c;报错位置在第二个参数。 ?sort1 and updatexml(1,concat(0x7e,database(),0x7e),1)-- #查询库名 ?sort1 and updatexml(1,concat(0x7e,(select group_concat(table_name) fr…

第三百七十回

文章目录 1. 概念介绍2. 使用方法2.1 获取所有时区2.2 转换时区时间 3. 示例代码4. 内容总结 我们在上一章回中介绍了"分享一些好的Flutter站点"相关的内容&#xff0c;本章回中将介绍timezone包.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在…

OpenAI Triton 入门教程

文章目录 Triton 简介背景Triton 与 CUDA 的关系 Triton 开发样例样例一&#xff1a;Triton vector addition 算子Triton kernel 实现kernel 函数封装函数调用性能测试 样例二&#xff1a;融合 Softmax 算子动机Triton kernel 实现kernel 封装单元测试性能测试 样例三&#xff…

服了,阿里云服务器和腾讯云服务器价格差不多怎么选择?

2024年阿里云服务器和腾讯云服务器价格战已经打响&#xff0c;阿里云服务器优惠61元一年起&#xff0c;腾讯云服务器62元一年&#xff0c;2核2G3M、2核4G、4核8G、8核16G、16核32G、16核64G等配置价格对比&#xff0c;阿腾云atengyun.com整理阿里云和腾讯云服务器详细配置价格表…

【软件测试】接口调不通排查分析+常遇面试题总结

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、接口调不通&am…

Leetcode583. 两个字符串的删除操作 -代码随想录

题目&#xff1a; 代码(首刷自解 2024年2月29日&#xff09;&#xff1a; class Solution { public:// 动态规划 好像和找最长公共子序列一样&#xff1f;int minDistance(string word1, string word2) {int sz1 word1.size();int sz2 word2.size();// dp initvector<vec…

是谁家的小千金跑出来了?

古典的山树绣花设计 精致典雅&#xff0c;上身立体又轻盈 做了粉绿两色&#xff0c;很适合春天的氛围 春天是个适合外出游玩的季节 穿上这件出游真的超美&#xff0c;日常穿也可 超出片很吸睛&#xff01;

JavaEE——简单认识JavaScript

文章目录 一、简单认识 JavaScript 的组成二、基本的输入输出和简单语法三、变量的使用四、JS 中的动态类型图示解释常见语言的类型形式 五、JS中的数组六、JS 中的函数七、JS 中的对象 一、简单认识 JavaScript 的组成 对于 JavaScript &#xff0c;其中的组成大致分为下面的…

综合练习(一)

目录 列出薪金高于部门 30 的所有员工薪金的员工姓名和薪金、部门名称、部门人数 列出与 ALLEN从事相同工作的所有员工及他们的部门名称、部门人数、领导姓名 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 列出薪金高于部门 30 的所…

打造去中心化透明储蓄罐:Solidity智能合约的又一实践

一、案例背景 传统的储蓄罐通常是由个人或家庭使用&#xff0c;用于存放硬币或小额纸币。然而&#xff0c;这样的储蓄罐缺乏透明性&#xff0c;用户无法实时了解储蓄情况&#xff0c;也无法确保资金的安全性。 通过Solidity智能合约&#xff0c;我们可以构建一个去中心化…

论文笔记:基于互信息估计和最大化的深度表示学习

整理了ICLR2019 LEARNING DEEP REPRESENTATIONS BY MUTUAL INFORMATION ESTIMATION AND MAXIMIZATION&#xff09;论文的阅读笔记 背景模型 论文地址&#xff1a;DIM code&#xff1a;代码地址 背景 发现有用的表示是深度学习的一个核心目标&#xff0c;由于之前的工作已经可以…

使用js写一个登录验证码效果

面试题 登录页面获取验证码的功能&#xff0c;用户点击获取验证码按钮(id”btn1”)&#xff0c;按文字变为“(N)后获取验证码”&#xff0c;N为倒计对秒数&#xff0c;从 60 开始&#xff0c;每秒减一&#xff0c;减到 0的时候&#xff0c;按钮文字变为“获取验证码”&#xff…