TPL 数据流块从单个输入产生多个输出

问题描述

我已经开始研究 TPL Dataflow 作为处理工作流的解决方案。

处理工作流的要点是从多个表中读取输入消息并从中创建四个反射对象并将它们持久化到其他四个表中,因此每个输入消息应导致创建四个新消息。

我无法确定可以帮助创建四个对象的预定义块之一,起初 TransformManyBlock 似乎是我正在寻找的,但它返回多个相同类型的对象,其中我将有四种类型。

问题示例

我们有两个包含来自两个遗留系统的员工详细信息的表,它们的实体看起来像这样

public partial class EmployeeTblA
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Forename { get; set; }
    public string Surname { get; set; }
    public int Age { get; set; }
    public int Number { get; set; }
    public string Street { get; set; }
    public string PostCode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeTblB
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

我们想从两个系统中获取数据并将数据放入我们闪亮的新系统中,为此我们需要将旧系统中的实体转换为新系统中使用的实体。首先,我们将旧系统中的实体转换为如下所示的基类

public class BaseEmployee
{
    public int Id { get; set; }
    public int System { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }
}

然后我们想从基类创建三个新对象,代表新系统的实体,如下所示

public partial class EmployeeName
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public string Name { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeAge
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public int Age { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

public partial class EmployeeAddress
{
    public int Id { get; set; }
    public int System { get; set; }
    public int LegacyId { get; set; }
    public string Address { get; set; }
    public string Postcode { get; set; }

    public virtual EmployeeSystem SystemNavigation { get; set; }
}

上面例子中我的 TPL 的粗略流程

  1. 数据库中的表中读取数据到一个转换块 转换为一个公共对象,对于每个遗留系统,这会下降两次

  2. 每个 TransformBlock链接一个 BatchBlock 以对所有输入流进行分组。

  3. BatchBlock 链接一个 Block,它将接受输入并从输入数据创建两个新对象,EmployeeName 和 EmployeeAge。

  4. Block 然后将链接到 Action 块和 Action ,它们将它们保存到数据库中各自的表中

我知道我可以创建一个自定义块,但我不知道如何使用它为使用数据流的四个单独链接的 ActionBlock 提供输出,有人可以指出我正确的方向吗?

解决方法

Broadcast 块是我最终使用的组件,我用它来将 BaseEmployee 对象广播到其他输出流,从而分离出我需要创建的反射对象。

完整管道如下

         _transEmployeeA = new TransformBlock<EmployeeTblA,BaseMsg>((input) =>
         {
            return new BaseMsg()
            {
                Id = input.Id,System = input.System,Name = string.Concat(input.Forename," ",input.Surname),Age = input.Age,Address = string.Concat(input.Number,input.Street),Postcode = input.PostCode
            };
        });

        _transEmployeeB = new TransformBlock<EmployeeTblB,BaseMsg>((input) =>
        {
            return new BaseMsg()
            {
                Id = input.Id,Name = input.Name,Address = input.Address,Postcode = input.Postcode
            };
        });

        _broadcastBaseMsg = new BroadcastBlock<BaseMsg>(null);

        _transEmployeeName = new TransformBlock<BaseMsg,EmployeeName>((baseMsg) =>
        {
            return new EmployeeName()
            {
                System = baseMsg.System,LegacyId = baseMsg.Id,Name = baseMsg.Name
            };
        });

        _transEmployeeAge = new TransformBlock<BaseMsg,EmployeeAge>((baseMsg) =>
        {
            return new EmployeeAge()
            {
                System = baseMsg.System,Age = baseMsg.Age
            };
        });

        _transEmployeeAddress = new TransformBlock<BaseMsg,EmployeeAddress>((baseMsg) =>
        {
            return new EmployeeAddress()
            {
                System = baseMsg.System,Address = baseMsg.Address,Postcode = baseMsg.Postcode
            };
        });


        _bufferEmployeeName = new BufferBlock<EmployeeName>();
        _bufferEmployeeAge = new BufferBlock<EmployeeAge>();
        _bufferEmployeeAddress = new BufferBlock<EmployeeAddress>();

        _actionEmployeeName = new ActionBlock<EmployeeName>((output) =>
        {
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeNames.Add(output);
                cxt.SaveChanges();
            }
        });

        _actionEmployeeAge = new ActionBlock<EmployeeAge>((output) =>
        {                
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeAges.Add(output);
                cxt.SaveChanges();
            }                
        });

        _actionEmployeeAddress = new ActionBlock<EmployeeAddress>((output) =>
        {                
            using (var cxt = new SandboxContext())
            {
                cxt.EmployeeAddresses.Add(output);
                cxt.SaveChanges();
            }               
        });

        var linkOpts = new DataflowLinkOptions()
        {
            PropagateCompletion = true
        };

        // Transform Employees and pass to Batch
        _transEmployeeA.LinkTo(_broadcastBaseMsg,linkOpts);
        _transEmployeeB.LinkTo(_broadcastBaseMsg,linkOpts);

        // Transform Broadcast to respective outputs
        _broadcastBaseMsg.LinkTo(_transEmployeeName,linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAge,linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAddress,linkOpts);

        // Add outputs to Buffer
        _transEmployeeName.LinkTo(_bufferEmployeeName,linkOpts);
        _transEmployeeAge.LinkTo(_bufferEmployeeAge,linkOpts);
        _transEmployeeAddress.LinkTo(_bufferEmployeeAddress,linkOpts);

        // Persist outputs to DB
        _bufferEmployeeName.LinkTo(_actionEmployeeName,linkOpts);
        _bufferEmployeeAge.LinkTo(_actionEmployeeAge,linkOpts);
        _bufferEmployeeAddress.LinkTo(_actionEmployeeAddress,linkOpts);

@TheodorZoulias 的其他评论有助于简化我对这个特定数据流的 TPL 数据流的使用。