【分布式】tensorflow 1 分布式代码实战与说明;单个节点上运行 2 个分布式worker工作线程

news2025/4/22 21:19:54

tensorflow.python.framework.errors_impl.UnknowError: Could not start
gRPC server

1. tf分布式

一台电脑=服务器=server是一个节点,包含了多个GPU。首先分布式的方式就是让多台电脑上的gpu共同干活。

分布式工作分为两个部分,parameter server(ps) 以及worker。眼熟ps与worker,因为这个是工作,每个server,都得干活,所以只能是从这两个工作里面选择。ps的工作类似于存储参数,而损失的计算,梯度的决定都是有worker进行的。这个对代码的影响就是,ps节点其实完全可以由cpu来做。worker必须由gpu做。

整体结构如图:一共四个sever,每个sever假设包含4个GPU,下图一共16个GPU。两个server工作是ps,两个sever的工作是worker,这个name其实没有在代码中配置,所以不用理会。server同做一个工作,也需要区分的,所以又引入了task,并且有task id。这里只是演示一下job(ps,worker)和server(节点)的关系。
在这里插入图片描述

2. 代码

代码的讲解是踩点来的。就是怎么用代码互相交流。

从理论上看,我们需要一些节点,并且给他们分配工作。

所以做一下程序入口接受参数(节点都是是谁,给什么工作了),我比较喜欢接收参数,不喜欢在代码里面写死。因为flags是tf基础,不想解释增加长度。

每个节点都得被单独通知,并且单独运行,这意味着如果你有一个ps,两个worker(一般用一个ps即可),你得在bash命令里:

python train.py --ps都谁(ps_hosts) --worker都谁(woker_hosts) --我被分配干啥(job_name) --我是第几个干这活的(task_index)

python train.py --ps都谁(ps_hosts) --worker都谁(woker_hosts) --我被分配干啥(job_name) --我是第几个干这活的(task_index)

python train.py --ps都谁(ps_hosts) --worker都谁(woker_hosts) --我被分配干啥(job_name) --我是第几个干这活的(task_index)

就是输入三次,跑三次,同时。ps和worker都会等你输完了在一起工作,毕竟要等同伴。

3. 示例

https://gist.github.com/yaroslavvb/1124bb02a9fd4abce3d86caf2f950cb2

"""Benchmark tensorflow distributed by adding vector of ones on worker2
to variable on worker1 as fast as possible.
On 2014 macbook, TensorFlow 0.10 this shows
Local rate:       2175.28 MB per second
Distributed rate: 107.13 MB per second
"""

import subprocess
import tensorflow as tf
import time
import sys

flags = tf.flags
flags.DEFINE_integer("iters", 10, "Maximum number of additions")
flags.DEFINE_integer("data_mb", 100, "size of vector in MBs")
flags.DEFINE_string("port1", "12222", "port of worker1")
flags.DEFINE_string("port2", "12223", "port of worker2")
flags.DEFINE_string("task", "", "internal use")
FLAGS = flags.FLAGS

# setup local cluster from flags
host = "127.0.0.1:"
cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]}
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()

def default_config():
  optimizer_options = tf.OptimizerOptions(opt_level=tf.OptimizerOptions.L0)
  config = tf.ConfigProto(
    graph_options=tf.GraphOptions(optimizer_options=optimizer_options))
  config.log_device_placement = False
  config.allow_soft_placement = False
  return config

def create_graph(device1, device2):
  """Create graph that keeps variable on device1 and
  vector of ones/addition op on device2"""
  
  tf.reset_default_graph()
  dtype=tf.int32
  params_size = 250*1000*FLAGS.data_mb # 1MB is 250k integers

  with tf.device(device1):
    params = tf.get_variable("params", [params_size], dtype,
                             initializer=tf.zeros_initializer)
  with tf.device(device2):
    # constant node gets placed on device1 because of simple_placer
    #    update = tf.constant(1, shape=[params_size], dtype=dtype)
    update = tf.get_variable("update", [params_size], dtype,
                             initializer=tf.ones_initializer)
    add_op = params.assign_add(update)
    
  init_op = tf.initialize_all_variables()
  return init_op, add_op

def run_benchmark(sess, init_op, add_op):
  """Returns MB/s rate of addition."""
  
  sess.run(init_op)
  sess.run(add_op.op)  # warm-up
  start_time = time.time()
  for i in range(FLAGS.iters):
    # change to add_op.op to make faster
    sess.run(add_op)
  elapsed_time = time.time() - start_time
  return float(FLAGS.iters)*FLAGS.data_mb/elapsed_time


def run_benchmark_local():
  ops = create_graph(None, None)
  sess = tf.Session(config=default_config())
  return run_benchmark(sess, *ops)


def run_benchmark_distributed():
  ops = create_graph("/job:worker/task:0", "/job:worker/task:1")

  # launch distributed service
  def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT)
  runcmd("python %s --task=0"%(sys.argv[0]))
  runcmd("python %s --task=1"%(sys.argv[0]))
  time.sleep(1)

  sess = tf.Session("grpc://"+host+FLAGS.port1, config=default_config())
  return run_benchmark(sess, *ops)
  
if __name__=='__main__':
  if not FLAGS.task:

    rate1 = run_benchmark_local()
    rate2 = run_benchmark_distributed()

    print("Adding data in %d MB chunks" %(FLAGS.data_mb))
    print("Local rate:       %.2f MB per second" %(rate1,))
    print("Distributed rate: %.2f MB per second" %(rate2,))

  else: # Launch TensorFlow server
    server = tf.train.Server(clusterspec, config=default_config(),
                             job_name="worker",
                             task_index=int(FLAGS.task))
    server.join()

4. 单个节点上运行 2 个分布式worker工作线程

https://stackoverflow.com/questions/40877246/distributed-tensorflow-not-working-with-simple-example

https://github.com/ischlag/distributed-tensorflow-example/blob/master/example.py

'''
Distributed Tensorflow 1.2.0 example of using data parallelism and share model parameters.
Trains a simple sigmoid neural network on mnist for 20 epochs on three machines using one parameter server. 

Change the hardcoded host urls below with your own hosts. 
Run like this: 

pc-01$ python example.py --job_name="ps" --task_index=0 
pc-02$ python example.py --job_name="worker" --task_index=0 
pc-03$ python example.py --job_name="worker" --task_index=1 
pc-04$ python example.py --job_name="worker" --task_index=2 

More details here: ischlag.github.io
'''

from __future__ import print_function

import tensorflow as tf
import sys
import time

# cluster specification
parameter_servers = ["pc-01:2222"]
workers = [	"pc-02:2222", 
			"pc-03:2222",
			"pc-04:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})

# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

# start a server for a specific task
server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)

# config
batch_size = 100
learning_rate = 0.0005
training_epochs = 20
logs_path = "/tmp/mnist/1"

# load mnist data set
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":

	# Between-graph replication
	with tf.device(tf.train.replica_device_setter(
		worker_device="/job:worker/task:%d" % FLAGS.task_index,
		cluster=cluster)):

		# count the number of updates
		global_step = tf.get_variable(
            'global_step',
            [],
            initializer = tf.constant_initializer(0),
			trainable = False)

		# input images
		with tf.name_scope('input'):
		  # None -> batch size can be any size, 784 -> flattened mnist image
		  x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
		  # target 10 output classes
		  y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")

		# model parameters will change during training so we use tf.Variable
		tf.set_random_seed(1)
		with tf.name_scope("weights"):
			W1 = tf.Variable(tf.random_normal([784, 100]))
			W2 = tf.Variable(tf.random_normal([100, 10]))

		# bias
		with tf.name_scope("biases"):
			b1 = tf.Variable(tf.zeros([100]))
			b2 = tf.Variable(tf.zeros([10]))

		# implement model
		with tf.name_scope("softmax"):
			# y is our prediction
			z2 = tf.add(tf.matmul(x,W1),b1)
			a2 = tf.nn.sigmoid(z2)
			z3 = tf.add(tf.matmul(a2,W2),b2)
			y  = tf.nn.softmax(z3)

		# specify cost function
		with tf.name_scope('cross_entropy'):
			# this is our cost
			cross_entropy = tf.reduce_mean(
                -tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))

		# specify optimizer
		with tf.name_scope('train'):
			# optimizer is an "operation" which we can execute in a session
			grad_op = tf.train.GradientDescentOptimizer(learning_rate)
			'''
			rep_op = tf.train.SyncReplicasOptimizer(
                grad_op,
			    replicas_to_aggregate=len(workers),
 				replica_id=FLAGS.task_index, 
 			    total_num_replicas=len(workers),
 				use_locking=True)
 			train_op = rep_op.minimize(cross_entropy, global_step=global_step)
 			'''
			train_op = grad_op.minimize(cross_entropy, global_step=global_step)
			
		'''
		init_token_op = rep_op.get_init_tokens_op()
		chief_queue_runner = rep_op.get_chief_queue_runner()
		'''

		with tf.name_scope('Accuracy'):
			# accuracy
			correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
			accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

		# create a summary for our cost and accuracy
		tf.summary.scalar("cost", cross_entropy)
		tf.summary.scalar("accuracy", accuracy)

		# merge all summaries into a single "operation" which we can execute in a session 
		summary_op = tf.summary.merge_all()
		init_op = tf.global_variables_initializer()
		print("Variables initialized ...")

	sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
														global_step=global_step,
														init_op=init_op)

	begin_time = time.time()
	frequency = 100
	with sv.prepare_or_wait_for_session(server.target) as sess:
		'''
		# is chief
		if FLAGS.task_index == 0:
			sv.start_queue_runners(sess, [chief_queue_runner])
			sess.run(init_token_op)
		'''
		# create log writer object (this will log on every machine)
		writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph())
				
		# perform training cycles
		start_time = time.time()
		for epoch in range(training_epochs):

			# number of batches in one epoch
			batch_count = int(mnist.train.num_examples/batch_size)

			count = 0
			for i in range(batch_count):
				batch_x, batch_y = mnist.train.next_batch(batch_size)
				
				# perform the operations we defined earlier on batch
				_, cost, summary, step = sess.run(
												[train_op, cross_entropy, summary_op, global_step], 
												feed_dict={x: batch_x, y_: batch_y})
				writer.add_summary(summary, step)

				count += 1
				if count % frequency == 0 or i+1 == batch_count:
					elapsed_time = time.time() - start_time
					start_time = time.time()
					print("Step: %d," % (step+1), 
								" Epoch: %2d," % (epoch+1), 
								" Batch: %3d of %3d," % (i+1, batch_count), 
								" Cost: %.4f," % cost, 
								" AvgTime: %3.2fms" % float(elapsed_time*1000/frequency))
					count = 0


		print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
		print("Total Time: %3.2fs" % float(time.time() - begin_time))
		print("Final Cost: %.4f" % cost)

	sv.stop()
	print("done")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1197057.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Practice01-Qt6.0设置文本颜色、格式等。

Qt6.0学习,在此做个记录,方便日后查找复习 本次项目用到的控件有:复选框,单选按钮。文本编辑框。 项目目录结构: 项目运行效果图: 实现的功能: 勾选Underline、Italic,Bold时&…

BUUCTF——刮开有奖

打开程序: 就一个这个玩意儿,没有输入框,没有啥的,打开IDA反编译一下: 直接找到WinMain,发现里面只有一个对话框API(如果只有一个对话框,那真就没有输入框了)&#xff0…

人工智能基础——Python:Matplotlib与绘图设计

人工智能的学习之路非常漫长,不少人因为学习路线不对或者学习内容不够专业而举步难行。不过别担心,我为大家整理了一份600多G的学习资源,基本上涵盖了人工智能学习的所有内容。点击下方链接,0元进群领取学习资源,让你的学习之路更加顺畅!记得…

Android---屏幕适配的处理技巧

在几年前,屏幕适配一直是困扰 Android 开发工程师的一大问题。但是随着近几年各种屏幕适配方案的诞生,以及谷歌各种适配控件的推出,屏幕适配也显得越来越容易。下面,我们就来总结一下关于屏幕适配的那些技巧。 ConstraintLayout …

【数据结构】二叉树经典例题---<你真的掌握二叉树了吗?>(第一弹)

一、已知一颗二叉树如下图,试求: (1)该二叉树前序、中序和后序遍历的结果。 (2)该二叉树是否为满二叉树?是否为完全二叉树? (3)将它转换成对应的树或森林。 (4)这颗二叉树的深度为多少? (5)试对该二叉树进行前序线索化。 (6)试对…

向量的点积和外积

参考:https://www.cnblogs.com/gxcdream/p/7597865.html 一、向量的内积(点乘) 定义: 两个向量a与b的内积为 ab |a||b|cos∠(a, b),特别地,0a a0 0;若a,b是非零向量,…

Shopee收款账户怎么设置?shopee收款方式选哪种

Shopee作为一家领先的电子商务平台,为卖家提供了多种收款方式。无论是在线支付、虚拟账户余额还是线下支付,卖家可以根据自己的需求和交易情况来进行选择。然而,在选择收款方式时,安全性、便捷性和市场适应性是需要考虑虾皮Shopee…

【Git】Git的GUI图形化工具ssh协议IDEA集成Git

一、GIT的GUI图形化工具 1、介绍 Git自带的GUI工具,主界面中各个按钮的意思基本与界面文字一致,与git的命令差别不大。在了解自己所做的操作情况下,各个功能点开看下就知道是怎么操作的。即使不了解,只要不做push操作,…

【数据结构】顺序表 | 详细讲解

在计算机中主要有两种基本的存储结构用于存放线性表:顺序存储结构和链式存储结构。本篇文章介绍采用顺序存储的结构实现线性表的存储。 顺序存储定义 线性表的顺序存储结构,指的是一段地址连续的存储单元依次存储链性表的数据元素。 线性表的&#xf…

Activiti BPMN visualizer Using Of Idear

Launch 安装插件 创建文件 可视化创建按钮 设置条件,是在线上设置的

【C++破局】C++内存管理之new与deleted剖析

​作者主页 📚lovewold少个r博客主页 ⚠️本文重点:c内存管理部分知识点梳理 👉【C-C入门系列专栏】:博客文章专栏传送门 😄每日一言:花有重开日,人无再少年! 目录 C/C的内存分配机…

Vue中的常用指令v-html / v-show / v-if / v-else / v-on / v-bind / v-for / v-model

前言 持续学习总结输出中,Vue中的常用指令v-html / v-show / v-if / v-else / v-on / v-bind / v-for / v-model 概念:指令(Directives)是Vue提供的带有 v- 前缀 的特殊标签属性。可以提高操作 DOM 的效率。 vue 中的指令按照不…

Java Web——HTTP协议

目录 1. HTTP协议概述 1.1. HTTP数据传输格式 1.2. HTTP协议特点 2. HTTP 1.0和HTTP 1.1 3. HTTP请求协议 3.1. GET方式请求协议 3.2. POST方式请求协议 3.3. GET请求和POST请求的区别 4. HTTP相应协议 4.1. 响应状态码 如果两个国家进行会晤需要遵守一定的礼节。所以…

WMS配送中心主要业务流程

业务流程图 入库 波次出库 按门店和门店所属送货路线确定出库波次 入库 出库 移库、封仓 门店欠货能要点 1. 日常补货:分拣仓位商品小于当前商品在该位置的补货下限的时候;生成对此进行补货任务;补货完成后确认任务,系统变更库存…

win10使用mingw安装OpenCV4.8

1. cmake安装 下载链接如下https://github.com/Kitware/CMake/releases/download/v3.27.7/cmake-3.27.7-windows-x86_64.zip 解压后放到指定目录后,添加bin目录到环境变量即可。 2. mingw安装 下载链接如下(下图的x86_64-posix-sjlj): Download x86_…

DevChat:提升编程效率的AI编程助手

一、前言 1、当前开发的痛点😖 在软件开发过程中,开发者经常需要编写复杂的代码,如数据结构、算法、网络通信等,这些都需要耗费大量的时间和精力。同时,不同的编程语言和框架也会给开发者带来许多不便,例如…

Hadoop入门——数据分析基本步骤

文章目录 1.概述2.分析步骤2.1第一步 明确分析目的和思路2.2第二步 数据收集2.3第三步 数据处理2.4第四步 数据分析2.5第五步 数据展现2.6第六步 报告撰写 3.总结 1.概述 2.分析步骤 2.1第一步 明确分析目的和思路 2.2第二步 数据收集 2.3第三步 数据处理 2.4第四步 数据分析 …

C语言每日一题(28) 反转链表

牛客网 BM1 反转链表 题目描述 描述 给定一个单链表的头结点pHead(该头节点是有值的,比如在下图,它的val是1),长度为n,反转该链表后,返回新链表的表头。 数据范围: 0≤n≤1000 要求:空间复…

数据分析实战 | SVM算法——病例自动诊断分析

目录 一、数据分析及对象 二、目的及分析任务 三、方法及工具 四、数据读入 五、数据理解 六、数据准备 七、模型训练 八、模型应用及评价 一、数据分析及对象 CSV文件——“bc_data.csv” 数据集链接:https://download.csdn.net/download/m0_70452407/88…

【C++】智能指针(一)

这篇文章介绍下C的智能指针,当然,可能没有你想的那么智能。 为什么需要智能指针1 void remodel(string& str) {string* ps new string(str);str *ps;return; }这里不讨论这个函数有没有意义,在这段代码中,很明显&#xff…