问题描述
我已经开始研究 TPL Dataflow 作为处理工作流的解决方案。
处理工作流的要点是从多个表中读取输入消息并从中创建四个反射对象并将它们持久化到其他四个表中,因此每个输入消息应导致创建四个新消息。
我无法确定可以帮助创建四个对象的预定义块之一,起初 TransformManyBlock 似乎是我正在寻找的,但它返回多个相同类型的对象,其中我将有四种类型。
问题示例
我们有两个包含来自两个遗留系统的员工详细信息的表,它们的实体看起来像这样
public partial class EmployeeTblA
{
public int Id { get; set; }
public int System { get; set; }
public string Forename { get; set; }
public string Surname { get; set; }
public int Age { get; set; }
public int Number { get; set; }
public string Street { get; set; }
public string PostCode { get; set; }
public virtual EmployeeSystem SystemNavigation { get; set; }
}
public partial class EmployeeTblB
{
public int Id { get; set; }
public int System { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public string Address { get; set; }
public string Postcode { get; set; }
public virtual EmployeeSystem SystemNavigation { get; set; }
}
我们想从两个系统中获取数据并将数据放入我们闪亮的新系统中,为此我们需要将旧系统中的实体转换为新系统中使用的实体。首先,我们将旧系统中的实体转换为如下所示的基类
public class BaseEmployee
{
public int Id { get; set; }
public int System { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public string Address { get; set; }
public string Postcode { get; set; }
}
然后我们想从基类创建三个新对象,代表新系统的实体,如下所示
public partial class EmployeeName
{
public int Id { get; set; }
public int System { get; set; }
public int LegacyId { get; set; }
public string Name { get; set; }
public virtual EmployeeSystem SystemNavigation { get; set; }
}
public partial class EmployeeAge
{
public int Id { get; set; }
public int System { get; set; }
public int LegacyId { get; set; }
public int Age { get; set; }
public virtual EmployeeSystem SystemNavigation { get; set; }
}
public partial class EmployeeAddress
{
public int Id { get; set; }
public int System { get; set; }
public int LegacyId { get; set; }
public string Address { get; set; }
public string Postcode { get; set; }
public virtual EmployeeSystem SystemNavigation { get; set; }
}
上面例子中我的 TPL 的粗略流程
我知道我可以创建一个自定义块,但我不知道如何使用它为使用数据流的四个单独链接的 ActionBlock 提供输出,有人可以指出我正确的方向吗?
解决方法
Broadcast 块是我最终使用的组件,我用它来将 BaseEmployee 对象广播到其他输出流,从而分离出我需要创建的反射对象。
完整管道如下
_transEmployeeA = new TransformBlock<EmployeeTblA,BaseMsg>((input) =>
{
return new BaseMsg()
{
Id = input.Id,System = input.System,Name = string.Concat(input.Forename," ",input.Surname),Age = input.Age,Address = string.Concat(input.Number,input.Street),Postcode = input.PostCode
};
});
_transEmployeeB = new TransformBlock<EmployeeTblB,BaseMsg>((input) =>
{
return new BaseMsg()
{
Id = input.Id,Name = input.Name,Address = input.Address,Postcode = input.Postcode
};
});
_broadcastBaseMsg = new BroadcastBlock<BaseMsg>(null);
_transEmployeeName = new TransformBlock<BaseMsg,EmployeeName>((baseMsg) =>
{
return new EmployeeName()
{
System = baseMsg.System,LegacyId = baseMsg.Id,Name = baseMsg.Name
};
});
_transEmployeeAge = new TransformBlock<BaseMsg,EmployeeAge>((baseMsg) =>
{
return new EmployeeAge()
{
System = baseMsg.System,Age = baseMsg.Age
};
});
_transEmployeeAddress = new TransformBlock<BaseMsg,EmployeeAddress>((baseMsg) =>
{
return new EmployeeAddress()
{
System = baseMsg.System,Address = baseMsg.Address,Postcode = baseMsg.Postcode
};
});
_bufferEmployeeName = new BufferBlock<EmployeeName>();
_bufferEmployeeAge = new BufferBlock<EmployeeAge>();
_bufferEmployeeAddress = new BufferBlock<EmployeeAddress>();
_actionEmployeeName = new ActionBlock<EmployeeName>((output) =>
{
using (var cxt = new SandboxContext())
{
cxt.EmployeeNames.Add(output);
cxt.SaveChanges();
}
});
_actionEmployeeAge = new ActionBlock<EmployeeAge>((output) =>
{
using (var cxt = new SandboxContext())
{
cxt.EmployeeAges.Add(output);
cxt.SaveChanges();
}
});
_actionEmployeeAddress = new ActionBlock<EmployeeAddress>((output) =>
{
using (var cxt = new SandboxContext())
{
cxt.EmployeeAddresses.Add(output);
cxt.SaveChanges();
}
});
var linkOpts = new DataflowLinkOptions()
{
PropagateCompletion = true
};
// Transform Employees and pass to Batch
_transEmployeeA.LinkTo(_broadcastBaseMsg,linkOpts);
_transEmployeeB.LinkTo(_broadcastBaseMsg,linkOpts);
// Transform Broadcast to respective outputs
_broadcastBaseMsg.LinkTo(_transEmployeeName,linkOpts);
_broadcastBaseMsg.LinkTo(_transEmployeeAge,linkOpts);
_broadcastBaseMsg.LinkTo(_transEmployeeAddress,linkOpts);
// Add outputs to Buffer
_transEmployeeName.LinkTo(_bufferEmployeeName,linkOpts);
_transEmployeeAge.LinkTo(_bufferEmployeeAge,linkOpts);
_transEmployeeAddress.LinkTo(_bufferEmployeeAddress,linkOpts);
// Persist outputs to DB
_bufferEmployeeName.LinkTo(_actionEmployeeName,linkOpts);
_bufferEmployeeAge.LinkTo(_actionEmployeeAge,linkOpts);
_bufferEmployeeAddress.LinkTo(_actionEmployeeAddress,linkOpts);
@TheodorZoulias 的其他评论有助于简化我对这个特定数据流的 TPL 数据流的使用。