使用 Tensorflow 与 Tensorflow 1 低级代码和 Ray

问题描述

我正在尝试分发我使用 Ray 和 Tensorflow 1 构建的深度强化学习系统的训练。同时我正在使用 ray,因为我有很多代码可以并行化与训练没有直接关系的逻辑,我想使用 tf.distribute 实用程序并行化训练(即在不同 GPU 上的不同唤醒器上的梯度减少),主要是因为它可以使用 Nccl 通信库,我认为与其他方法相比,这将提高训练速度。

问题是我不想重构我的 tensorflow 代码(在低级别的旧 Tensorflow 1 中编写,使用自定义训练循环,我没有使用任何像 Keras 这样的 API),但我不知道如何使用 tf.distribute,即 MirrorStrategy,使用 Tensorflow 1 代码分发训练。

我在 Tensorflow 1 中发现了关于 tf.distributethis guide,但即使在自定义循环中,他们也使用 Keras API 进行模型和优化器构建。我试图尽可能地遵循本指南,以构建一个使用我在主项目中使用的库/API 的简单示例,但我无法使其正常工作。

示例代码如下:

import numpy as np
import tensorflow.compat.v1 as tf

import ray

tf.disable_v2_behavior()


@ray.remote(num_cpus=1,num_gpus=0)
class Trainer:
    def __init__(self,local_data):
        tf.disable_v2_behavior()
        self.current_w = 1.0
        self.local_data = local_data
        self.strategy = tf.distribute.MirroredStrategy()

        with self.strategy.scope():
            self.w = tf.Variable(((1.0)),dtype=tf.float32)
            self.x = tf.placeholder(shape=(None,1),dtype=tf.float32)
            self.y = self.w * self.x
            self.grad = tf.gradients(self.y,[self.w])

            def train_step_opt():
                def grad_fn():
                    grad = tf.gradients(self.y,[self.w])
                    return grad
                per_replica_grad = self.strategy.experimental_run_v2(grad_fn)
                red_grad = self.strategy.reduce(
                            tf.distribute.ReduceOp.SUM,per_replica_grad,axis=None)
                minimize = self.w.assign_sub(red_grad[0])
                return minimize

            self.minimize = self.strategy.experimental_run_v2(train_step_opt)

    def train_step(self):
        with self.strategy.scope():
            with tf.Session() as sess:
                sess.run(self.minimize,Feed_dict={self.x: self.local_data})
                self.current_w = sess.run(self.w)
        return self.current_w


ray.init()

data = np.arange(4) + 1
data = data.reshape((-1,1))
data_w = [data[None,i] for i in range(4)]

trainers = [Trainer.remote(d) for d in data_w]

W = ray.get([t.train_step.remote() for t in trainers])[0]

print(W)

它应该简单地计算不同过程中线性函数的导数,将所有导数减少到一个值中并将其应用于唯一参数“w”。

当我运行它时,我收到以下错误

Traceback (most recent call last):
  File "dtfray.py",line 49,in <module>
    r = ray.get([t.train_step.remote() for t in trainers])
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/ray/_private/client_mode_hook.py",line 47,in wrapper
    return func(*args,**kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/ray/worker.py",line 1456,in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::Trainer.train_step() (pid=25401,ip=10.128.0.46)
  File "python/ray/_raylet.pyx",line 439,in ray._raylet.execute_task
  File "python/ray/_raylet.pyx",line 473,line 476,line 480,line 432,in ray._raylet.execute_task.function_executor
  File "dtfray.py",line 32,in __init__
    self.minimize = self.strategy.experimental_run_v2(train_step_opt)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/util/deprecation.py",line 324,in new_func
    return func(*args,**kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py",line 957,in experimental_run_v2
    return self.run(fn,args=args,kwargs=kwargs,options=options)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py",line 951,in run
    return self._extended.call_for_each_replica(fn,kwargs=kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py",line 2290,in call_for_each_replica
    return self._call_for_each_replica(fn,args,kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py",line 770,in _call_for_each_replica
    fn,line 201,in _call_for_each_replica
    coord.join(threads)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py",line 389,in join
    six.reraise(*self._exc_info_to_raise)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/six.py",line 703,in reraise
    raise value
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py",line 297,in stop_on_exception
    yield
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py",line 998,in run
    self.main_result = self.main_fn(*self.main_args,**self.main_kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py",line 282,**kwargs)
  File "dtfray.py",line 22,in train_step_opt
    tf.distribute.get_replica_context().merge_call()
TypeError: merge_call() missing 1 required positional argument: 'merge_fn'

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)