分布式的TensorFlow




该文档讲述了如何创建一个集群的tensorflow服务器,以及如何分配在集群计算图。我们假设你熟悉写作tensorflow程序的基本概念。





Hello distributed TensorFlow!

演示一个简单的TensorFlow集群,执行以下命令::

# Start a TensorFlow server as a single-process "cluster".
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target)  # Create a session on the server.
>>> sess.run(c)
'Hello, distributed TensorFlow!'

tf.train.Server.create_local_server() 方法使用中间服务器集群创建了一个单一的进程集群。





创建一个集群

一个tensorflow“集群”是一套“任务”,参加一个tensorflow图的分布式执行。每个任务都与一个tensorflow“服务器”相关,其中包含一个“master”, 可以用来创建会话,和“worker”,执行操作的图。集群也可以分为一个或多个“作业”,其中每个作业包含一个或多个任务。

创建一个集群,你开始在每一tensorflow服务器集群的任务。每项任务通常运行在不同的机器上, 但是你可以运行多个任务在同一台机器上(例如,控制不同的GPU设备)。在每个任务中,做以下操作::

  1. 创建一个 tf.train.ClusterSpec t描述集群中的所有任务。对于每一个任务,这应该是相同的。

  2. 创建一个 tf.train.Server, 通过 tf.train.ClusterSpec 的构造函数,并确定本地任务的工作名称和任务index。







创建一个 tf.train.ClusterSpec 描述集群

集群规范词典地图工作网络地址列表名称。 把列表传给 tf.train.ClusterSpec 的构造函数. 例如:

tf.train.ClusterSpec constructionAvailable tasks
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
/job:local/task:0
/job:local/task:1
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222", 
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1




在每个任务里,创建一个 tf.train.Server 的实例

一个tf.train.server对象包含一组本地设备,一组连接在tf.train.clusterspec其他任务, 和一个“会话”的目标,可以使用它们来执行分布式计算。 每个服务器都是一个特定的命名作业的成员,并在该作业中有一个任务索引。 服务器可以与集群中的任何其他服务器进行通信。.

例如,有两个服务器在本地运行启动一个集群:localhost:2222,localhost:2223,在本地机器运行下面的两个不同进程的代码:

# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)

注:手动指定这些集群规格可以是繁琐的,特别是对于大集群。.





在你的模型里指定分布式设备

把他们的工作在一个特定的进程中,您可以使用相同的tf.device()函数, 用于指定操作运行在CPU或GPU。例如:

with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)

with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)

with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...

with tf.Session("grpc://worker7.example.com:2222") as sess:
  for _ in range(10000):
    sess.run(train_op)

在上面的例子中,在ps的job中,变量是创建在两个任务中的, 计算密集型的模型的一部分是在worker的job中创建的。 tensorflow插入适当的数据传输在ps和worker之间的工作(PS为向前工作,从worker到ps应用梯度)。





重复训练

一个常见的训练配置,被称为“数据并行”,包含多个任务的worker job里, 在不同的小批量的数据训练同样的模型, 更新在ps job上的共享参数。所有的任务通常在不同的机器上运行。 有很多方法可以指定tensorflow这个结构,我们正在建设的lib,将简化指定重复模型的工作。可能的方法包括::





总的来说: 举一个例子

下面的代码展示了分布式训练程序的框架 实现了between-graph replication和asynchronous training。它包括参数服务器和worker任务的代码。

import tensorflow as tf

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS


def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
  
  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Build model...
      loss = ...
      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.merge_all_summaries()
      init_op = tf.initialize_all_variables()

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        _, step = sess.run([train_op, global_step])

    # Ask for all the services to stop.
    sv.stop()

if __name__ == "__main__":
  tf.app.run()

要启动具有两个参数服务器和两个worker的训练,使用以下命令行

# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1



术语