RabbitMQ消息队列(三):任务分发机制

news2024/12/27 14:35:23

在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题。在实际的应用场景中,这是远远不够的。从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法。

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程来完成。接下来我们分布讲解。

应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务:

1. 准备

在上一篇文章中,我们简单在Message中包含了一个字符串”Hello World”。现在为了是Consumer做的是计算密集型的工作,那就不能简单的字符串了。在现实应用中,Consumer有可能做的是一个图片的resize,或者是pdf文件的渲染或者内容提取。但是作为Demo,还是用字符串模拟吧:通过字符串中的.的数量来决定计算的复杂度,每个.都会消耗1s,即sleep(1)。

还是复用上篇文章中的code,根据“计算密集型”做一下简单的修改,为了辨别,我们把send.py 的名字换成new_task.py

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',routing_key='hello',body=message)
print " [x] Sent %r" % (message,)

同样的道理,把receive.py的名字换成worker.py,并且根据Message中的.的数量进行计算密集型模拟:

import time

def callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep( body.count('.') )print " [x] Done"

2. Round-robin dispatching 循环分发

RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果现在load加重,那么只需要创建更多的Consumer来进行任务处理即可。当然了,对于负载还要加大怎么办?我没有遇到过这种情况,那就可以创建多个virtual Host,细化不同的通信类别了。

首先开启两个Consumer,即运行两个worker.py。

Console1:

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

Consule2:

shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

Producer new_task.py要Publish Message了:

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

注意一下:.代表的sleep(1)。接着开一下Consumer worker.py收到了什么:

Console1:

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

Console2:

shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。这种分发还有问题,接着向下读吧。

3. Message acknowledgment 消息确认

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即

acknowledgments

。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。

如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

默认情况下,消息确认是打开的(enabled)。在上篇文章中我们通过no_ack = True 关闭了ack。重新修改一下callback,以在消息处理完成后发送ack:

def callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep( body.count('.') )print " [x] Done"ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='hello')

这样即使你通过Ctr-C中断了worker.py,那么Message也不会丢失了,它会被分发到下一个Consumer。

如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello0 0
...done.

4. Message durability消息持久化

在上一节中我们知道了即使Consumer异常退出,Message也不会丢失。但是如果RabbitMQ Server退出呢?软件都有bug,即使RabbitMQ Server是完美毫无bug的(当然这是不可能的,是软件就有bug,没有bug的那不叫软件),它还是有可能退出的:被其它软件影响,或者系统重启了,系统panic了。。。

为了保证在RabbitMQ退出或者crash了数据仍没有丢失,需要将queue和Message都要持久化。

queue的持久化需要在声明时指定durable=True:

channel.queue_declare(queue='hello', durable=True)

上述语句执行不会有什么错误,但是确得不到我们想要的结果,原因就是RabbitMQ Server已经维护了一个叫hello的queue,那么上述执行不会有任何的作用,也就是hello的任何属性都不会被影响。这一点在上篇文章也讨论过。

那么workaround也很简单,声明一个另外的名字的queue,比如名字定位task_queue:

channel.queue_declare(queue='task_queue', durable=True)

再次强调,Producer和Consumer都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的:

接下来,需要持久化Message,即在Publish的时候指定一个properties,方式如下:

channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties( delivery_mode = 2, # make message persistent))

关于持久化的进一步讨论:

为了数据不丢失,我们采用了:

1.在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。
2.持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。
3.持久化Message,理由同上。

但是这样能保证数据100%不丢失吗?

答案是否定的。问题就在与RabbitMQ需要时间去把这些信息存到磁盘上,这个time window虽然短,但是它的确还是有。在这个时间窗口内如果数据没有保存,数据还会丢失。还有另一个原因就是RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。

因此这个持久化还是有问题。但是对于大多数应用来说,这已经足够了。当然为了保持一致性,你可以把每次的publish放到一个

transaction

中。这个transaction的实现需要user defined codes。

那么商业系统会做什么呢?一种可能的方案是在系统panic时或者异常重启时或者断电时,应该给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。

5. Fair dispatch 公平分发

你可能也注意到了,分发机制不是那么优雅。默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。

那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

通过 basic.qos 方法设置prefetch_count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法如下:

channel.basic_qos(prefetch_count=1)

注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

6. 最终版本

new_task.py script:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties( delivery_mode = 2, # make message persistent))
print " [x] Sent %r" % (message,)
connection.close() 

worker.py script:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):print " [x] Received %r" % (body,)time.sleep( body.count('.') )print " [x] Done"ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='task_queue')

channel.start_consuming()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/160895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jetson nano上编译与使用西门子PLC通讯库snap7

文章目录一.西门子snap7介绍二.西门子S7通讯介绍三.jetson nano编译snap7库四.Qt Cmake导入snap7库五.snap7主要函数说明1.与PLC建立连接2.读写PA区变量3.读写MK区变量六.通讯程序示例一.西门子snap7介绍 Snap7 是一个基于以太网与S7系列的西门子PLC通讯的开源库。支持包括S7系…

2023美赛数学建模ABCDEF题思路模型代码

占个位置吧,开始在本帖实时更新赛题思路代码,文章末尾获取! 持续为更新参考思路 赛题思路 会持续进行思路模型分析,下自行获取。 A题思路: (比赛开始后第一时间更新) B题思路:…

《算法分析与设计》复习笔记

目录 一、算法的基本概念 1.1 算法的定义 1.2 算法的“好坏”如何衡量? 1.3 描述算法的时间复杂度 ⭐ 1.4 如何评价算法 二、 分治法 2.1 分治法的求解步骤 2.2 平衡的概念 2.3 递归式解法 2.3.1 主定理法 ⭐ 2.4 分治法的使用条件 2.5 分治法实例 2.5…

助力安全作业生产,基于轻量级YOLOv6s开发实践反光衣检测识别分析系统

在很多实际作业生产场景中,出于对安全的考虑,施工作业等操作都是要求穿戴反光衣的,这个主要是为了保护人身安全,但是很多时候工程作业场景下因为实际种种的原因工人实际作业操作的时候很多人并没有按照要求穿戴反光衣这就给安全生…

OPTEE安全存储

本文主要介绍OPTEE的安全存储技术,翻译自官方文档:Secure storage — OP-TEE documentation documentation (optee.readthedocs.io) 一、背景 OP-TEE中的安全存储是根据GlobalPlatform的TEE Internal Core API(这里称为可信存储)…

2023/1/13总结

今天学习了链式向前星和唯一分解定理(数论)。 链式向前星 链式向前星是一种存储图的方法,在此之前我们学到过存储图的方式:邻接表以及邻接矩阵,邻接矩阵浪费了很大的空间,而邻接表 写起来的代码有一点点…

微信小程序wxml的数据和事件的绑定,以及条件和列表的渲染

文章目录1.数据绑定的基本原则在data中定义页面的数据2.事件绑定bingtap的语法格式:在事件处理函数中为data中的数据赋值事件传参bindinput的语法格式实现文本框和data之间的数据同步1.定义数据2.渲染结构3.美化样式4.绑定input事件处理函数3.条件渲染hiddenwx:if与hidden的对比…

数据库 表设计 MySQL

表设计 约束 为了保证入库数据的合理性,添加的各种规则。 约束的分类 准备测试用的表格: CREATE TABLE emp ( id INT, -- 员工id,主键且自增长 ename VARCHAR(50), -- 员工姓名,非空且唯一 joindate DATE, -- 入职日期&…

【uniapp】渲染列表数据删除项导致每项数据重置的问题解决方案

开发uniapp项目,使用的是JavaScript Vue写法,操作wList数组列表更新的时候,如果每一项都带input 或 radio组件,要操作移除的话,那么组件的输入数据会被清除重置,若不希望这样,那应该怎么做才好呢…

设计模式相关内容介绍—软件设计原则(六个)

在软件开发中,为了提高软件系统的可维护性和可复用性,增加软件的可扩展性和灵活性,程员要尽量根据6条原则来开发程序,从而提高软件开发效率、节约软件开发成本和维护成本。 目录 1.开闭原则 2.里氏代替原则 3.依赖倒转原则 4.接…

dvwa中的文件包含攻击

环境:dvwa: 192.168.11.135 dvwa版本: Version 1.9 (Release date: 2015-09-19)kail机器:192.168.11.156一、什么是文件包含漏洞?为简化代码,会把重复的code内容单独写到一个页面文件,然后再需要调用重复内容的页面中…

C语言:初识C语言

目录前言1. 什么是c语言呢2. 第一个c语言程序2. 数据类型3. 变量和常量3.1 变量3.1.1 变量的定义3.1.2 变量的分类3.1.3 变量的使用3.1.4 变量的作用域和生命周期3.2 常量4. 字符串、转义字符、注释4.1 字符串4.2 转义字符4.3 注释5. 选择语句6. 循环语句7. 函数8. 数组9. 操作…

学习笔记——keep-alive缓存组件,再次返回组件data数据重置

前言:使用keep-alive缓存组件,当再次返回该组件后,希望其组件中的数据或状态,保持上次离开该组件时的情况。 一、当前组件树 希望缓存HomeMain组件的状态。 二、错误处理 我在HomeMain的祖先组件HomeLayout中,写了如下…

sqlplus 连接数据库

终端直连 Oracle 数据库 ORA-12162 错误 出于各种网络原因,无法直连数据库,但又必须查询数据库数据 我们只能选择直连数据库的服务器 然后通过 sqlplus 连接 Oracle 从配置文件里获取这样一段信息 urljdbc:oracle:thin:192.168.1.3:1521:testdb use…

【SpringCloud08】SpringCloud Consul服务注册与发现

1.Consul简介 1.1是什么 官网 Consul 是一套开源的分布式服务发现和配置管理系统,由 HashiCorp 公司用Go 语言开发 提供了微服务系统中的服务治理、配置中心、控制总线等功能。这些功能中的每一个都可以根据需要单独使用,也可以一起使用以构建全方位…

基于MPLS-V**多分部互访的ensp企业网络规划与设计_ensp综合实验

作者:BSXY_19计科_陈永跃BSXY_信息学院注:未经允许禁止转发任何内容基于MPLS-V**多分部互访的ensp企业网络规划与设计_ensp综合实验前言及技术/资源下载说明( **未经允许禁止转发任何内容** )插曲:基于eNSP中大型校园/…

卡特加特数字中控主机,数字家庭控制中心!没它智能家居就是智障!

数字中控主机是数字家庭的核心,承担着“协调各方、总揽全局”的作用,是打造未来数字家庭空间必不可少的设备。区别于传统家居智能,它真正意义上告别了过去以设备为中心的架构,而是以人的个性化需求为中心,以数据作为资…

2020统考真题-距离最小三元组

2020年统考真题 定义三元组$ (a,b,c)$ ( a,b,c 均为正数)的距离 D∣a−b∣∣b−c∣∣c−a∣D|a−b||b−c||c−a|D∣a−b∣∣b−c∣∣c−a∣ 。给定 3个非空整数集合 S1 、 S2 和 S3 ,按升序分别存储在 3 个数组中。请设计一个尽可能高效的算…

蓝队攻击的四个阶段(三)

目录 一, 专业技能储备 1.工具开发技能 2.漏洞挖掘技能 3.代码调试技能 4.侦破拓展技能 二,目标网情搜集 1 何为网情搜集 2. 网情搜集的主要工作 三, 网情搜集的途径 1.专业网站 2.专业开发资源网站 3.目标官网 一, 专…

算法训练营 day17 二叉树 平衡二叉树 二叉树的所以路径 左叶子之和

算法训练营 day17 二叉树 平衡二叉树 二叉树的所以路径 左叶子之和 平衡二叉树 110. 平衡二叉树 - 力扣(LeetCode) 给定一个二叉树,判断它是否是高度平衡的二叉树。 本题中,一棵高度平衡二叉树定义为: 一个二叉树每…