问题描述
我想运行一个最简单的例子来计算分布式 autograd 并在我的主脚本中打印梯度。为此,我执行以下操作:
- 以 master/worker0 的身份启动主进程
- 以 worker1 身份启动助手
- 从 worker1 发送以通过 rpc 进行计算
- 从远程获取渐变并打印
由于某种原因,这不起作用。看来我的工人彼此陷入僵局。我不确定为什么。起初我收到一个错误,抱怨工作人员似乎正在尝试使用相同的端口(这是我的假设,因为一旦工作人员/进程使用不同的端口,它就会得到修复)。
现在我解决了这个问题,我知道它们彼此死锁,并且它们不执行我的分布式 autograd 计算。我很清楚为什么。我认为这与 init_rpc 有关,因为它们似乎就停在那里。 init_rpc 的文档如下:
torch.distributed.rpc.init_rpc(name,backend=None,rank=-1,world_size=None,rpc_backend_options=None) 初始化本地RPC代理和分布式autograd等RPC原语,立即使当前进程准备就绪发送和接收 RPC。
或者可能是由于shutdown
torch.distributed.rpc.shutdown(graceful=True) 关闭 RPC 代理,然后销毁 RPC 代理。这会阻止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果graceful=True,这将阻塞,直到所有本地和远程RPC 进程到达此方法并等待所有未完成的工作完成。否则,如果graceful=False,则为本地关闭,不等待其他RPC进程到达此方法。
我尝试以不同的组合从我的代码中删除/添加关闭,但它仍然陷入僵局......这就是为什么我最终提出了一个问题,因为我不确定为什么会出现这个问题。
我的代码非常简单,我只是远程多重化两个张量,然后尝试获得梯度。而已。灵感来自这个a simple end to end example。
我已经没有主意了。有人知道是怎么回事吗?
import os
import torch
from torch.distributed import rpc
import torch.multiprocessing as mp
from torch.multiprocessing import Process
def start_worker(rank,world_size):
# initialize worker proc
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = f'2950{rank}'
print(f'initializing worker{rank}')
rpc.init_rpc(f'worker{rank}',rank=rank,world_size=world_size)
print(f'finished initializing worker{rank}')
# block until all rpcs finish,and shutdown the RPC instance
# rpc.shutdown()
def test1_rpc():
print('running test1_rpc')
world_size = 2
# initialize worker
worker_rank = 1
worker_proc = Process(target=start_worker,args=(worker_rank,world_size))
worker_proc.start()
# initialize main as master
rank = 0 # master rank
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = f'2950{rank}'
# Initializes RPC primitives such as the local RPC agent and distributed autograd,which immediately makes the current process ready to send and receive RPCs.
print(f'initializing worker{rank}')
rpc.init_rpc(f'worker{rank}',world_size=world_size)
print(f'finished initializing worker{rank}')
# # initialize worker
# worker_rank = 1
# worker_proc = Process(target=start_worker,world_size))
# worker_proc.start()
# print('initialized and started worker proc')
# do distributed autograd backward pass
def loss(a,b):
return a.mm(b).sum()
with torch.distributed.autograd.context() as context_id:
print('in distributed autograd')
a = torch.ones([2,5],requires_grad=True)
b = torch.ones([5,3],requires_grad=True)
loss = rpc.rpc_sync(to='worker1',func=loss,args=(a,b))
torch.distributed.autograd(context_id,loss)
print(a.grad)
print(b.grad)
rpc.shutdown()
def test1():
a = torch.ones([2,requires_grad=True)
b = torch.ones([5,requires_grad=True)
loss = a.mm(b).sum()
loss.backward()
print(a.grad)
print(b.grad)
print()
if __name__ == '__main__':
test1()
test1_rpc()
输出:
tensor([[3.,3.,3.],[3.,3.]])
tensor([[2.,2.,2.],[2.,2.]])
running test1_rpc
initializing worker0
initializing worker1
reddit:https://www.reddit.com/r/pytorch/comments/lq0gu7/why_does_my_pytorch_rpc_workers_deadlock_is_it/
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)