如何通过sftp进行异步文件传输?

问题描述

我要解决的问题如下: 我有一台台式计算机,要分析大量数据(约5 TB)。数据包含500k文件,每个文件都可以单独分析。 为了进行分析,我在大学里有一系列服务器可用,但是服务器没有足够的空间来存储所有这些数据,也没有空间来存储分析的输出

所以我的想法是将数据分段地复制到服务器上,运行分析,将结果传输回桌面,删除服务器上的输入和输出数据,然后重复。

对于文件传输,我昨天安装了paramiko,它似乎工作得很好:

remote_get = 'test'
local_deliver = './test'

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_host_keys(os.path.expanduser(os.path.join("~",".ssh","kNown_hosts")))
ssh.connect(server,username=username,password=password)
sftp = ssh.open_sftp()

for root,dirs,files in os.walk(local_path):
    for fname in files:
        full_fname = os.path.join(root,fname)
        full_remote = os.path.join(remote_path,fname)
        sftp.put(full_fname,full_remote)
sftp.close()
ssh.close()

但是,我唯一的问题是,我需要传输的数据量可能要花费数天才能往返,因此,如果可能的话,我希望异步启动数据传输,这样我就可以对当前数据集,同时传输下一个要分析的数据集。

但是我对如何做这样的事情一无所知,有人能指出我正确的方向吗?

解决方法

此解决方案使用multiprocessing.Pool创建一个由单独进程组成的任务池。每次调用apply_async时,都会传递一个函数指针和一个args列表。在这种情况下,要执行的功能是copy_file,而arg是文件名:

import os
import paramiko

from multiprocessing import Pool

remote_get = 'test'
local_deliver = './test'

pool = Pool(processes=4)  # Experiment with this number based on your # CPUs
def copy_file(filename):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.load_host_keys(os.path.expanduser(os.path.join("~",".ssh","known_hosts")))
    ssh.connect(server,username=username,password=password)
    sftp = ssh.open_sftp()

    full_fname = os.path.join(root,fname)
    full_remote = os.path.join(remote_path,fname)
    sftp.put(full_fname,full_remote)
    sftp.close()
    ssh.close()

for root,dirs,files in os.walk(local_deliver):
    for fname in files:
        pool.apply_async(copy_file,[fname])

您的原始变量中有一些没有说明,所以我用了我的最佳猜测。 ssh和sftp客户端的创建需要移至copy_file内,因为除非将其序列化,否则您将无法保存并在进程之间共享它。

可以根据CPU的数量来调整processes上的multiprocessing.Pool参数,但是请记住,您将在这里面对多个瓶颈:1. CPU,2. NIC带宽限制, 3.磁盘I / O限制。

这里有更多的多处理文档:https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers

编辑:我也记得,paramiko的SFTP与炮击SFTP命令的速度明显慢。写出批处理文件并用subprocess.call执行以获得更好的性能可能是值得的。