场景说明
我们有10个任务需要主动发送到3台机器上并行执行,某一台机器执行完成再为此机器分配下一个任务
方案一:消息队列(被动调度)
此方案可以使用celery+redis实现简单的生产者消费者模型,步骤如下:
- 启动redis消息中间件
- 启动生产者把任务发送到redis消息队列中
- 在每台机器上的消费者worker监控消息队列
此方案被动式并行方案,执行机抢占式消费任务,自由度较小,此处不展开讲解
方案二:手动实现(主动调度)
代码如下:
代码仅作演示,实际并没有发送到机器,可通过在task_func自己实现调度别的机器
import time
import multiprocessing
import threading
import redis
tasks = [
'task_001',
'task_002',
'task_003',
'task_004',
'task_005',
'task_006',
'task_007',
'task_008',
'task_009',
'task_010',
]
wokers = [
"192.168.0.101",
"192.168.0.102",
"192.168.0.103",
]
redis_conn = redis.Redis(host='127.0.0.1', port=6379, db=0)
redis_conn.ltrim('users_id.tasks', 1, 0)
redis_conn.ltrim('users_id.wokers', 1, 0)
redis_conn.lpush('users_id.tasks', *tasks)
redis_conn.lpush('users_id.wokers', *wokers)
def task_func(worker, task):
time.sleep(1)
print(f"task:{task}已完成")
redis_conn.lpush('users_id.wokers', worker)
def main():
p_list = []
while True:
task = redis_conn.rpop('users_id.tasks')
if not task:
break
woker = redis_conn.brpop('users_id.wokers')
p = threading.Thread(target=task_func, args=(woker[1].decode(), task.decode()))
p.start()
p_list.append(p)
for i in p_list:
i.join()
print("所有任务已完成")
if __name__ == '__main__':
multiprocessing.freeze_support()
t = time.time()
main()
print(time.time() - t)
步骤说明
- 首先将任务和worker全部放到redis队列中
- 每次取出一个task,然后取出一个worker,然后可以编写逻辑将任务发送到worker上执行task
- 当任务还有剩余,而worker全部被取出时,程序阻塞等待直到有可用的worker
- 每个任务执行完成,将其worker重新放入队列中,被阻塞的程序可以获得worker继续执行
- 所有的任务全部取出后,退出循环,等待所有的任务都执行完毕
- 结束
结果说明
每个任务需要一秒的话,串行则需要十秒,如果3台机器同时,时间应该在4秒多
- 输出结果如下,可以看到时间正如预期,代表逻辑正常