问题描述
我想从 FTP 目录递归下载多个下载文件,为此我使用 FluentFTP 库,我的代码是这样的:
private async Task downloadRecursively(string src,string dest,FtpClient ftp)
{
foreach(var item in ftp.GetListing(src))
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0)
{
System.IO.Directory.CreateDirectory(Path.Combine(dest,item.Name));
downloadRecursively(Path.Combine(src,item.Name),Path.Combine(dest,ftp);
}
}
else if (item.Type == FtpFileSystemObjectType.File)
{
await ftp.DownloadFileAsync(Path.Combine(dest,Path.Combine(src,item.Name));
}
}
}
我知道您每次下载都需要一个 FtpClient,但是我如何才能最大程度地使用一定数量的连接,我想这个想法是为我找到的每个文件创建、连接、下载和关闭同时有 X 个下载文件。此外,我不确定是否应该使用异步、线程和我最大的问题来创建任务,即如何实现所有这些。
来自 @Bradley 的答案在这里看起来不错,但问题确实读取了必须从外部文件下载的每个文件,并且它没有最大并发下载值,所以我不确定如何申请这两个要求。
解决方法
使用:
-
ConcurrentBag
class 实现连接池; -
Parallel
class 并行化操作; -
ParallelOptions.MaxDegreeOfParallelism
限制并发线程数。
var clients = new ConcurrentBag<FtpClient>();
var opts = new ParallelOptions { MaxDegreeOfParallelism = maxConnections };
Parallel.ForEach(files,opts,file =>
{
file = Path.GetFileName(file);
string thread = $"Thread {Thread.CurrentThread.ManagedThreadId}";
if (!clients.TryTake(out var client))
{
Console.WriteLine($"{thread} Opening connection...");
client = new FtpClient(host,user,pass);
client.Connect();
Console.WriteLine($"{thread} Opened connection {client.GetHashCode()}.");
}
string remotePath = sourcePath + "/" + file;
string localPath = Path.Combine(destPath,file);
string desc =
$"{thread},Connection {client.GetHashCode()}," +
$"File {remotePath} => {localPath}";
Console.WriteLine($"{desc} - Starting...");
client.DownloadFile(localPath,remotePath);
Console.WriteLine($"{desc} - Done.");
clients.Add(client);
});
Console.WriteLine($"Closing {clients.Count} connections");
foreach (var client in clients)
{
Console.WriteLine($"Closing connection {client.GetHashCode()}");
client.Dispose();
}
另一种方法是启动固定数量的线程,每个线程都有一个连接,并让它们从队列中挑选文件。
有关实现的示例,请参阅我关于 WinSCP .NET 程序集的文章:
Automating transfers in parallel connections over SFTP/FTP protocol
关于 SFTP 的类似问题:
Processing SFTP files using C# Parallel.ForEach loop not processing downloads
这是一个 TPL Dataflow 方法。 BufferBlock<FtpClient>
用作 FtpClient
对象池。递归枚举采用 IEnumerable<string>
类型的参数,该参数保存一个文件路径的段。在构建本地和远程文件路径时,这些段的组合方式不同。作为调用递归枚举的副作用,远程文件的路径被发送到 ActionBlock<IEnumerable<string>>
。此块处理文件的并行下载。它的 Completion
属性最终包含了整个操作过程中可能发生的所有异常。
public static Task FtpDownloadDeep(string ftpHost,string ftpRoot,string targetDirectory,string username = null,string password = null,int maximumConnections = 1)
{
// Arguments validation omitted
if (!Directory.Exists(targetDirectory))
throw new DirectoryNotFoundException(targetDirectory);
var fsLocker = new object();
var ftpClientPool = new BufferBlock<FtpClient>();
async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient,Task<TResult>> action)
{
var client = await ftpClientPool.ReceiveAsync();
try { return await action(client); }
finally { ftpClientPool.Post(client); } // Return to the pool
}
var downloader = new ActionBlock<IEnumerable<string>>(async path =>
{
var remotePath = String.Join("/",path);
var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
var localDir = Path.GetDirectoryName(localPath);
lock (fsLocker) Directory.CreateDirectory(localDir);
var status = await UsingFtpAsync(client =>
client.DownloadFileAsync(localPath,remotePath));
if (status == FtpStatus.Failed) throw new InvalidOperationException(
$"Download of '{remotePath}' failed.");
},new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = maximumConnections,BoundedCapacity = maximumConnections,});
async Task Recurse(IEnumerable<string> path)
{
if (downloader.Completion.IsCompleted) return; // The downloader has failed
var listing = await UsingFtpAsync(client =>
client.GetListingAsync(String.Join("/",path)));
foreach (var item in listing)
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0) await Recurse(path.Append(item.Name));
}
else if (item.Type == FtpFileSystemObjectType.File)
{
var accepted = await downloader.SendAsync(path.Append(item.Name));
if (!accepted) break; // The downloader has failed
}
}
}
// Move on to the thread pool,to avoid ConfigureAwait(false) everywhere
return Task.Run(async () =>
{
// Fill the FtpClient pool
for (int i = 0; i < maximumConnections; i++)
{
var client = new FtpClient(ftpHost);
if (username != null && password != null)
client.Credentials = new NetworkCredential(username,password);
ftpClientPool.Post(client);
}
try
{
// Enumerate the files to download
await Recurse(new[] { ftpRoot });
downloader.Complete();
}
catch (Exception ex) { ((IDataflowBlock)downloader).Fault(ex); }
try
{
// Await the downloader to complete
await downloader.Completion;
}
catch (OperationCanceledException)
when (downloader.Completion.IsCanceled) { throw; }
catch { downloader.Completion.Wait(); } // Propagate AggregateException
finally
{
// Clean up
if (ftpClientPool.TryReceiveAll(out var clients))
foreach (var client in clients) client.Dispose();
}
});
}
用法示例:
await FtpDownloadDeep("ftp://ftp.test.com","",@"C:\FtpTest","username","password",maximumConnections: 10);
注意: 上面的实现按照下载过程的节奏懒惰地枚举远程目录。如果您希望尽快枚举它,尽快收集有关远程列表的所有可用信息,只需从下载文件的 BoundedCapacity = maximumConnections
中删除 ActionBlock
配置。请注意,这样做可能会导致高内存消耗,以防远程目录具有很深的子文件夹层次结构,累积包含大量小文件。
我会把它分成三个部分。
- 递归构建源和目标对的列表。
- 创建所需的目录。
- 同时下载文件。
最后一部分很慢,应该并行完成。
代码如下:
private async Task DownloadRecursively(string src,string dest,FtpClient ftp)
{
/* 1 */
IEnumerable<(string source,string destination)> Recurse(string s,string d)
{
foreach (var item in ftp.GetListing(s))
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0)
{
foreach(var pair in Recurse(Path.Combine(s,item.Name),Path.Combine(d,item.Name)))
{
yield return pair;
}
}
}
else if (item.Type == FtpFileSystemObjectType.File)
{
yield return (Path.Combine(s,item.Name));
}
}
}
var pairs = Recurse(src,dest).ToArray();
/* 2 */
foreach (var d in pairs.Select(x => x.destination).Distinct())
{
System.IO.Directory.CreateDirectory(d);
}
/* 3 */
var downloads =
pairs
.AsParallel()
.Select(x => ftp.DownloadFileAsync(x.source,x.destination))
.ToArray();
await Task.WhenAll(downloads);
}
代码应该干净、整洁且易于推理。