问题描述
我要解决的问题如下: 我有一台台式计算机,要分析大量数据(约5 TB)。数据包含500k文件,每个文件都可以单独分析。 为了进行分析,我在大学里有一系列服务器可用,但是服务器没有足够的空间来存储所有这些数据,也没有空间来存储分析的输出。
所以我的想法是将数据分段地复制到服务器上,运行分析,将结果传输回桌面,删除服务器上的输入和输出数据,然后重复。
对于文件传输,我昨天安装了paramiko,它似乎工作得很好:
remote_get = 'test'
local_deliver = './test'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_host_keys(os.path.expanduser(os.path.join("~",".ssh","kNown_hosts")))
ssh.connect(server,username=username,password=password)
sftp = ssh.open_sftp()
for root,dirs,files in os.walk(local_path):
for fname in files:
full_fname = os.path.join(root,fname)
full_remote = os.path.join(remote_path,fname)
sftp.put(full_fname,full_remote)
sftp.close()
ssh.close()
但是,我唯一的问题是,我需要传输的数据量可能要花费数天才能往返,因此,如果可能的话,我希望异步启动数据传输,这样我就可以对当前数据集,同时传输下一个要分析的数据集。
但是我对如何做这样的事情一无所知,有人能指出我正确的方向吗?
解决方法
此解决方案使用multiprocessing.Pool
创建一个由单独进程组成的任务池。每次调用apply_async
时,都会传递一个函数指针和一个args列表。在这种情况下,要执行的功能是copy_file
,而arg是文件名:
import os
import paramiko
from multiprocessing import Pool
remote_get = 'test'
local_deliver = './test'
pool = Pool(processes=4) # Experiment with this number based on your # CPUs
def copy_file(filename):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_host_keys(os.path.expanduser(os.path.join("~",".ssh","known_hosts")))
ssh.connect(server,username=username,password=password)
sftp = ssh.open_sftp()
full_fname = os.path.join(root,fname)
full_remote = os.path.join(remote_path,fname)
sftp.put(full_fname,full_remote)
sftp.close()
ssh.close()
for root,dirs,files in os.walk(local_deliver):
for fname in files:
pool.apply_async(copy_file,[fname])
您的原始变量中有一些没有说明,所以我用了我的最佳猜测。 ssh和sftp客户端的创建需要移至copy_file
内,因为除非将其序列化,否则您将无法保存并在进程之间共享它。
可以根据CPU的数量来调整processes
上的multiprocessing.Pool
参数,但是请记住,您将在这里面对多个瓶颈:1. CPU,2. NIC带宽限制, 3.磁盘I / O限制。
这里有更多的多处理文档:https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers
编辑:我也记得,paramiko的SFTP与炮击SFTP命令的速度明显慢。写出批处理文件并用subprocess.call
执行以获得更好的性能可能是值得的。