python实现简单的多机并行调度

news2024/11/16 7:44:47

场景说明

我们有10个任务需要主动发送到3台机器上并行执行,某一台机器执行完成再为此机器分配下一个任务

方案一:消息队列(被动调度)

此方案可以使用celery+redis实现简单的生产者消费者模型,步骤如下:

  • 启动redis消息中间件
  • 启动生产者把任务发送到redis消息队列中
  • 在每台机器上的消费者worker监控消息队列
    此方案被动式并行方案,执行机抢占式消费任务,自由度较小,此处不展开讲解

方案二:手动实现(主动调度)

代码如下:
代码仅作演示,实际并没有发送到机器,可通过在task_func自己实现调度别的机器

import time
import multiprocessing
import threading

import redis

tasks = [
    'task_001',
    'task_002',
    'task_003',
    'task_004',
    'task_005',
    'task_006',
    'task_007',
    'task_008',
    'task_009',
    'task_010',
]
wokers = [
    "192.168.0.101",
    "192.168.0.102",
    "192.168.0.103",
]
redis_conn = redis.Redis(host='127.0.0.1', port=6379, db=0)
redis_conn.ltrim('users_id.tasks', 1, 0)
redis_conn.ltrim('users_id.wokers', 1, 0)
redis_conn.lpush('users_id.tasks', *tasks)
redis_conn.lpush('users_id.wokers', *wokers)


def task_func(worker, task):
    time.sleep(1)
    print(f"task:{task}已完成")
    redis_conn.lpush('users_id.wokers', worker)


def main():
    p_list = []
    while True:
        task = redis_conn.rpop('users_id.tasks')
        if not task:
            break
        woker = redis_conn.brpop('users_id.wokers')
        p = threading.Thread(target=task_func, args=(woker[1].decode(), task.decode()))
        p.start()
        p_list.append(p)
    for i in p_list:
        i.join()
    print("所有任务已完成")


if __name__ == '__main__':
    multiprocessing.freeze_support()
    t = time.time()
    main()
    print(time.time() - t)

步骤说明

  • 首先将任务和worker全部放到redis队列中
  • 每次取出一个task,然后取出一个worker,然后可以编写逻辑将任务发送到worker上执行task
  • 当任务还有剩余,而worker全部被取出时,程序阻塞等待直到有可用的worker
  • 每个任务执行完成,将其worker重新放入队列中,被阻塞的程序可以获得worker继续执行
  • 所有的任务全部取出后,退出循环,等待所有的任务都执行完毕
  • 结束

结果说明

每个任务需要一秒的话,串行则需要十秒,如果3台机器同时,时间应该在4秒多

  • 输出结果如下,可以看到时间正如预期,代表逻辑正常
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/623606.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于单片机的时钟浅谈及STM32F103/F030单片机的内外时钟切换问题

绪论 本文主要讲解单片机的时钟系统的相关知识,并进行超频测试,同时介绍如何在STM32F0单片机上进行内外时钟的切换,在不使用外部晶振或者外部晶振不启动时自动切换内部时钟的方法。 一、杂谈 问题来源于群里的一次问答: 诚然&…

Flatpickr教程:使用JavaScript快速创建一个自定义日期选择器

部分数据来源:ChatGPT 引言 如果您是一个网站开发者,想为自己的网站添加方便易用的日期选择对话框,那么Flatpickr日期选择对话框可能正好符合您的需要。在这篇文章中,我们将详细介绍如何使用Flatpickr日期选择对话框&#xff0c…

容器集群管理工具 Docker Swarm

前言 《了解和使用Docker》中有提到容器编排工具 docker compose ,不过只限于单机。如果现在需要搭建一个集群环境,提供了10台服务器用来部署应用以及其依赖的组件,比如5个 Tomcat 应用容器、3个Redis、5个 Mysql、3个 Nginx ,你…

Share Creators Ada Liu 与 VNG Christopher. Liu C出席 2023 全球游戏产业峰会

夏日将至,第二十届中国国际数码互动娱乐展览会(ChinaJoy)将于 2023 年 7 月 28 日至 7 月 31 日在上海新国际博览中心隆重举办。 本届 ChinaJoy 将带来多场重磅主题高端会议,其中全球游戏产业峰会将于 7 月 29 日在上海浦东嘉里大…

网安大佬常用的10大工具

从事网络安全工作,手上自然离不开一些重要的网络安全工具。今天,分享10大网络安全工具。 一、Kali Linux Kali 是一个基于 Debian 的 Linux 发行版。它的目标就是为了简单:在一个实用的工具包里尽可能多的包含渗透和审计工具。Kali 实现了这…

互联网大厂面试必备——1685页《Java 面试突击核心手册,二十大专题,覆盖2000道 Java后端核心面试解析

前言 不论是校招还是社招都避免不了各种面试。笔试,如何去准备这些东西就显得格外重要。不论是笔试还是面试都是有章可循的,我这个有章可循‘说的意思只是说应对技术面试是可以提前准备。 运筹帷幄之后,决胜千里之外!不打毫无准备的仗,我觉…

HikariCP:一个叫光的JDBC连接池

文章目录 简介数据库连接池C3P0DBCPBoneCP 精简的设计字节码优化ArrayList-->FastListConcurrentBag代理实现Statement CacheScheduler quantaCPU缓存行失效 优雅的实现获取连接初始化池对象连接池管理连接池扩充连接池缩容连接池关闭 ConcurrentBag 连接池参数总结参考 简介…

网络协议分析:网络性能的防御工具

作为网络管理员知道管理不断发展的 IT 环境需要付出巨大的努力。无论是对于小型还是大型企业,管理网络以使其可访问并使其性能有效都需要一套监控策略和工具。大多数 IT 管理员需要协议分析器来识别潜在的网络风险并帮助排除故障。与传统分析不同,协议分…

PPT中彩虹线-变色线是怎么画出来的?

​ 效果 上面用箭头指出的线框处,各位可以看到这种有多种颜色组成的渐变的就叫彩虹线 彩虹线是怎么设置的? 请看下面的操作步骤 此处,请单击选中你要变色的线,然后我们点击鼠标右键,在弹出的菜单中选择“设置形状格式" ​ 然后你会在PPT右边得到这样的一个界面…

脑机接口 | 面向步态神经电生理研究的非人灵长类模型与系统

近期,海南大学生物医学工程学院脑机芯片神经工程团队在Frontiers in Neuroscience期刊上发表了题为《面向步态&神经电生理研究的非人灵长类模型与系统》的学术论文。海南大学生物医学工程学院梁丰研副教授为第一作者,殷明教授为通讯作者。海南大学为…

2023年上半年软件设计师试题及答案解析

请点击↑关注、收藏,本博客免费为你获取精彩知识分享!有惊喜哟!! 计算机中,系统总线用于 (1) 连接。 (1) A. 接口和外设 B. 运算器、控制器和寄存器 C. CPU、主存及…

经典面试题---【第一档】

1.如果你想new一个Quene,你有几种方式?他们之间的区别是什么? 2.Redis 是如何判断数据是否过期的呢? Redis 通过一个叫做过期字典(可以看作是 hash 表)来保存数据过期的时间。过期字典的键指向 Redis 数据…

R语言脚本:关于 TissueEnrich包 得到的组织特异性基因富集结果的进一步处理

1. 说明 (来自官方文档): The TissueEnrich package is used to calculate enrichment of tissue-specific genes in a set of input genes. Tissue-specific genes were defined by processing RNA-Seq data from the Human Protein Atlas (HPA) (Uhln et al. 2015…

HttpServlet概述

HTTP协议包括: 请求协议:浏览器向WEB服务器发送数据的时候,这个发送的数据需要遵循一套标准,这套标准中规定了发送的数据具体格式。 相应协议:WEB服务器向浏览器发送数据的时候,这个发送的数据需要遵循一套标准,这套标准中规定了发…

【2023年首次更新】MyEclipse v2023.1支持Java 20

MyEclipse让您在开发过程中不受技术约束,不断创新帮您找到关键技术的解决方案您能在这里得到Java EE开发所需要的一切支持! MyEclipse v2023.1官方正式版下载 更新日志如下: MyEclipse官方近期更新了2023年第一个版本——v2023.1&#xff…

Nat.Commun. : 新的硬件将扩大量子计算机的工业应用规模

光子盒研究院 由明尼苏达大学双城分校领导的一个团队开发了一种新的超导二极管——这是电子设备中的一个关键部件,可以帮助扩大量子计算机的工业使用规模,并提高人工智能系统的性能。与其他超导二极管相比,研究人员的装置更加节能、可以同时处…

看过才知道,这套SpringCloudAlibaba笔记,把微服务玩的出神入化!

Spring Cloud Alibaba 致力于提供微服务开发的一站式解决方案。此项目包含开发分布式应用微服务的必需组件,依托Spring Cloud Alibaba,只需要添加一些注解和少量配置,就可以将Spring Cloud 应用接入阿里微服务解决方案,通过阿里中…

pwn(2)-栈溢出下

32位shellcode编写 不同内核态操作通过给寄存器设置不同的值,在调用指令int 80h,就可以通知内核完成不同的功能。 只要我们通过特定的汇编代码把特定的寄存器设定为特定的值后,在调用int 80h执行sys_execve(“/bin/sh”,NULL,NULL)就可以获…

Python获取链家二手房源数据信息

前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 环境使用: Python 3.8 Pycharm 模块使用: requests >>> pip install requests 数据请求模块 parsel >>> pip install parsel 数据解析模块 csv 内置模块 👇 👇 &#x1…

OJ#203.身高排序

题目描述 ​ 海贼小学为了强健学生的身体,每天课间都要组织学生在户外学做广播体操。​ 这一天,五年级三班的所有同学在老师的指引下将队形排成了 M行 N 列。 现已知所有同学 的身高,数值为整数,单位:厘米。要求在所有…