Python 进程与线程-分布式进程

news2025/3/14 20:07:46

目录

分布式进程

小结


分布式进程

在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?

原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

然后,在另一台机器上启动任务进程(本机上启动也可以):

# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

现在,可以试试分布式进程的工作效果了。先启动task_master.py服务进程:

$ python3 task_master.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:

$ python3 task_worker.py
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

task_worker.py进程结束,在task_master.py进程中会继续打印出结果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:

Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue

authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.pyauthkeytask_master.pyauthkey不一致,肯定连接不上。

小结

Python的分布式进程接口简单,封装良好,适合需要把繁重任务分布到多台机器的环境下。

注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2315054.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初阶数据结构(C语言实现)——5.2 二叉树的顺序结构及堆的实现

1.二叉树的顺序结构及实现 1.1 二叉树的顺序结构 普通的二叉树是不适合用数组来存储的,因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉树)使用顺序结构的数组来存储,需要注意的是这里的堆和操作系统…

ArcGIS Pro 车牌分区数据处理与地图制作全攻略

在大数据时代,地理信息系统(GIS)技术在各个领域都有着广泛的应用,而 ArcGIS Pro 作为一款功能强大的 GIS 软件,为数据处理和地图制作提供了丰富的工具和便捷的操作流程。 车牌数据作为一种重要的地理空间数据&#xf…

文件解析漏洞靶场通关合集

一、IIS解析漏洞 &#xff08;一&#xff09;iis6的目录解析漏洞(.asp目录中的所有文件都会被当做asp文件执行) 第一步&#xff1a;在网站根目录下创建了一个x.asp文件夹&#xff0c;并在文件夹中创建一个名为1.txt的文本文档 第二步&#xff1a;文本文档中输入<% now()%&…

塔能IVO-SCY智能机箱:点亮智慧城市的电力“智慧核芯”

在智慧城市建设的宏大征程中&#xff0c;稳定且智能的电力供应犹如坚固基石&#xff0c;支撑着各类设备高效、稳定地运行。塔能科技的IVO-SCY智能机箱&#xff0c;凭借其卓越的电源管理系统&#xff0c;当之无愧地成为了整个智慧城市电力保障体系中的“智慧心脏”&#xff0c;源…

【Oracle】19c数据库控制文件多路径配置

一、关闭数据库&#xff08;2个节点实例都要关闭&#xff09; srvctl stop database -d ora19c 二、多路径控制文件 打开其中一个节点到nomount状态 sqlplus / as sysdba startup nomount; [oracleora19c1:/home/oracle]$ rman target / RMAN> restore controlfile to…

Android Media3 ExoPlayer 开发全攻略:从基础集成到高级功能实战

目录 1. 引言 2. 添加依赖 3. 初始化ExoPlayer并播放视频 3.1 XML 布局 3.2 初始化ExoPlayer 4. 控制播放 5. 监听播放状态 6. 播放网络流&#xff08;HLS / DASH / RTSP&#xff09; 7. ExoPlayer 进阶 7.1 手动切换功能 7.2 DRM 保护 8. 释放播放器资源 9. 从旧…

Trae与Builder模式初体验

说明 下载的国际版&#xff1a;https://www.trae.ai/ 建议 要选新模型 效果 还是挺不错的&#xff0c;遇到问题反馈一下&#xff0c;AI就帮忙解决了&#xff0c;真是动动嘴&#xff08;打打字就行了&#xff09;&#xff0c;做些小的原型效果或演示Demo很方便呀&#xff…

如何通过修改hosts文件、启动Apache服务器、修改httpd.conf文件、配置虚拟主机、创建站点目录和文件等步骤来配置虚拟主机并发布PHP站点

Web服务器配置——修改hosts文件&#xff0c;将域名解析到本地 核心内容&#xff1a;介绍了如何通过修改hosts文件来实现将任意域名解析到本地&#xff0c;以便在开发过程中使用自定义域名访问本地站点。步骤&#xff1a; 打开位于C:\Windows\System32\drivers\etc的hosts文件…

政策助力,3C 数码行业数字化起航

政策引领&#xff0c;数字经济浪潮来袭 在当今时代&#xff0c;数字经济已成为全球经济发展的核心驱动力&#xff0c;引领着新一轮科技革命和产业变革的潮流。我国深刻洞察这一发展趋势&#xff0c;大力推进数字化经济发展战略&#xff0c;为经济的高质量发展注入了强大动力。 …

MySQL数据库复制

文章目录 MySQL数据库复制一、复制的原理二、复制的搭建1.编辑配置文件2.在主库上创建复制的用户3.获取主库的备份4.基于从库的恢复5.建立主从复制6.开启主从复制7.查看主从复制状态 MySQL数据库复制 MySQL作为非常流行的数据库&#xff0c;支撑它如此出彩的因素主要有两个&am…

101.在 Vue 3 + OpenLayers 使用 declutter 避免文字标签重叠

1. 前言 在使用 OpenLayers 进行地图开发时&#xff0c;我们经常需要在地图上添加点、线、区域等图形&#xff0c;并给它们附加文字标签。但当地图上的标注较多时&#xff0c;文字标签可能会发生重叠&#xff0c;导致用户无法清晰地查看地图信息。 幸运的是&#xff0c;OpenL…

uniapp移动端图片比较器组件,仿英伟达官网rtx光追图片比较器功能

组件下载地址&#xff1a;https://ext.dcloud.net.cn/plugin?id22609 已测试h5和微信小程序&#xff0c;理论支持全平台 亮点&#xff1a; 简单易用 使用js计算而不是resize属性&#xff0c;定制化程度更高 组件挂在后可播放指示线动画&#xff0c;提示用户可以拖拽比较图片…

深度学习与大模型-矩阵

矩阵其实在我们的生活中也有很多应用&#xff0c;只是我们没注意罢了。 1. 矩阵是什么&#xff1f; 简单来说&#xff0c;矩阵就是一个长方形的数字表格。比如你有一个2行3列的矩阵&#xff0c;可以写成这样&#xff1a; 这个矩阵有2行3列&#xff0c;每个数字都有一个位置&a…

搭建基于chatgpt的问答系统

一、语言模型&#xff0c;提问范式与 Token 1.语言模型 大语言模型&#xff08;LLM&#xff09;是通过预测下一个词的监督学习方式进行训练的&#xff0c;通过预测下一个词为训练目标的方法使得语言模型获得强大的语言生成能力。 a.基础语言模型 &#xff08;Base LLM&…

LuaJIT 学习(2)—— 使用 FFI 库的几个例子

文章目录 介绍Motivating Example: Calling External C Functions例子&#xff1a;Lua 中调用 C 函数 Motivating Example: Using C Data StructuresAccessing Standard System FunctionsAccessing the zlib Compression LibraryDefining Metamethods for a C Type例子&#xf…

解锁 AI 开发的无限可能:邀请您加入 coze-sharp 开源项目

大家好&#xff01;今天我要向大家介绍一个充满潜力的开源项目——coze-sharp&#xff01;这是一个基于 C# 开发的 Coze 客户端&#xff0c;旨在帮助开发者轻松接入 Coze AI 平台&#xff0c;打造智能应用。项目地址在这里&#xff1a;https://github.com/zhulige/coze-sharp&a…

全面解析与实用指南:如何有效解决ffmpeg.dll丢失问题并恢复软件正常运行

在使用多媒体处理软件或进行视频编辑时&#xff0c;你可能会遇到一个常见的问题——ffmpeg.dll文件丢失。这个错误不仅会中断你的工作流程&#xff0c;还可能导致软件无法正常运行。ffmpeg.dll是FFmpeg库中的一个关键动态链接库文件&#xff0c;负责处理视频和音频的编码、解码…

Python----计算机视觉处理(opencv:像素,RGB颜色,图像的存储,opencv安装,代码展示)

一、计算机眼中的图像 像素 像素是图像的基本单元&#xff0c;每个像素存储着图像的颜色、亮度和其他特征。一系列像素组合到一起就形成 了完整的图像&#xff0c;在计算机中&#xff0c;图像以像素的形式存在并采用二进制格式进行存储。根据图像的颜色不 同&#xff0c;每个像…

小米路由器SSH下安装DDNS-GO

文章目录 前言一、下载&#xff06;安装DDNS-GO二、配置ddns-go设置开机启动 前言 什么是DDNS&#xff1f; DDNS&#xff08;Dynamic Domain Name Server&#xff09;是动态域名服务的缩写。 目前路由器拨号上网获得的多半都是动态IP&#xff0c;DDNS可以将路由器变化的外网I…

go语言zero框架拉取内部平台开发的sdk报错的修复与实践

在开发过程中&#xff0c;我们可能会遇到由于认证问题无法拉取私有 SDK 的情况。这种情况常发生在使用 Go 语言以及 Zero 框架时&#xff0c;尤其是在连接到私有平台&#xff0c;如阿里云 Codeup 上托管的 Go SDK。如果你遇到这种错误&#xff0c;通常是因为 Go 没有适当的认证…