问题描述
|
我需要使用sqlBulkcopy将大型csv文件分块为几个不同的数据库插入。我打算通过2个单独的任务来执行此操作,其中1个用于批处理CSV文件,另一个用于插入数据库。例如,这就是我要处理的事情:
public class UberTask
{
private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string,int>>();
public void PerformTask()
{
var notifier = new UINotifier();
Task.Factory.StartNew(() =>
{
for (int i =0; i < 10; i++)
{
string description = string.Format(\"Scenario {0}\",i);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format(\"Reading \'{0}\' from file\",description)));
// represents reading the CSV file.
Thread.Sleep(500);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format(\"Enqueuing \'{0}\'\",description)));
_store.Add(new Tuple<string,int>(description,i));
}
_store.CompleteAdding();
});
var consumer = Task.Factory.StartNew(() =>
{
foreach (var item in _store.GetConsumingEnumerable())
{
var poppedItem = item;
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format(\"Sending \'{0}\' to the database\",poppedItem.Item1)));
// represents sending stuff to the database.
Thread.Sleep(1000);
}
});
consumer.Wait();
Console.WriteLine(\"complete\");
}
}
这是将两组相关任务配对的好方法吗?以上代码未处理的内容(需要处理):
如果代表CSV读取的任务出现故障,则其他任务需要停止(即使_store中仍有项目)。
如果代表数据库的任务插入错误,则其他进程可以停止处理。
如果配对任务中的任何一个出现故障,我将需要执行一些操作以回滚数据库更新(我不担心如何回滚),这更多地是关于如何编码“故障”的问题。发生在配对任务之一中,因此我需要进行一些整理。”
以上任何帮助将不胜感激!
解决方法
您可以使用异常处理和取消令牌来执行此操作。当管道阶段检测到错误时,它将捕获该错误并设置令牌。这将取消其他阶段。 finally块确保完成对CompleteAdding()的调用。这一点很重要,因为接收管道阶段可能会在等待收集时被阻塞,并且直到取消取消后才会处理取消。
您还希望排除集合中所有未处理的对象,或者在管道阶段完成时(最后)和/或在整个管道关闭时清理数据库连接。
这是执行此操作的管道阶段的示例:
static void LoadPipelinedImages(IEnumerable<string> fileNames,string sourceDir,BlockingCollection<ImageInfo> original,CancellationTokenSource cts)
{
// ...
var token = cts.Token;
ImageInfo info = null;
try
{
foreach (var fileName in fileNames)
{
if (token.IsCancellationRequested)
break;
info = LoadImage(fileName,...);
original.Add(info,token);
info = null;
}
}
catch (Exception e)
{
// in case of exception,signal shutdown to other pipeline tasks
cts.Cancel();
if (!(e is OperationCanceledException))
throw;
}
finally
{
original.CompleteAdding();
if (info != null) info.Dispose();
}
}
总体管道代码如下所示。它还支持通过设置取消令牌从外部(从UI)取消管道。
static void RunPipelined(IEnumerable<string> fileNames,int queueLength,Action<ImageInfo> displayFn,CancellationTokenSource cts)
{
// Data pipes
var originalImages = new BlockingCollection<ImageInfo>(queueLength);
var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
try
{
var f = new TaskFactory(TaskCreationOptions.LongRunning,TaskContinuationOptions.None);
// ...
// Start pipelined tasks
var loadTask = f.StartNew(() =>
LoadPipelinedImages(fileNames,sourceDir,originalImages,cts));
var scaleTask = f.StartNew(() =>
ScalePipelinedImages(originalImages,thumbnailImages,cts));
var filterTask = f.StartNew(() =>
FilterPipelinedImages(thumbnailImages,filteredImages,cts));
var displayTask = f.StartNew(() =>
DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(),... cts));
Task.WaitAll(loadTask,scaleTask,filterTask,displayTask);
}
finally
{
// in case of exception or cancellation,there might be bitmaps
// that need to be disposed.
DisposeImagesInQueue(originalImages);
DisposeImagesInQueue(thumbnailImages);
DisposeImagesInQueue(filteredImages);
}
}
有关完整示例,请参见此处下载的管道示例:
http://parallelpatterns.codeplex.com/releases/view/50473
在这里讨论:
http://msdn.microsoft.com/en-us/library/ff963548.aspx