如何在元流中创建嵌套分支?

问题描述

我正在使用 Metaflow 创建一个文本处理管道,如下所示:-

                                 ___F------
                     ______ D---|          |  
                    |           |___G---|  |__>  
          ____B-----|                   |----->H
         |          |______E_________________> ^
      A -|                                     |
         |____C________________________________|

根据 documentationbranch 允许并行计算步骤,它用于并行计算 (B,C)、(D,E) 和 (F,G)。最后所有的分支都在H处加入。以下是实现这个逻辑的代码:-

from Metaflow import FlowSpec,step

class TextProcessing(FlowSpec):

  @step
  def a(self):
    ....

    self.next(self.b,self.c)

  @step
  def c(self):
    result1 = {}

    ....

    self.next(self.join)

  @step
  def b(self):
    ....

    self.next(self.d,self.e)

  @step
  def e(self):
    result2 = []
    .....

    self.next(self.join)

  @step
  def d(self):
    ....

    self.next(self.f,self.g)

  @step
  def f(self):
    result3 = []
    ....

    self.next(self.join)

  @step
  def g(self):
    result4 = []
    .....

    self.next(self.join)


  @step
  def join(self,results):
    data = [results.c.result,results.e.result2,result.f.result3,result.g.result4]
    print(data)

    self.next(self.end)

  @step
  def end(self):
    pass

etl = TextProcessing()

在运行 python main.py run 时,出现以下错误:-

Metaflow 2.2.10 executing TextProcessing for user:ubuntu
Validating your flow...
    Validity checker found an issue on line 83:
    Step join seems like a join step (it takes an extra input argument) but an incorrect number of steps (c,e,f,g) lead to it. This join was expecting 2 incoming paths,starting from splitted step(s) f,g.

有人能指出我哪里出错了吗?

解决方法

再次仔细检查 docs 后,我意识到我没有正确处理连接。根据 metaflow-2.2.10 的文档:-

注意可以任意嵌套分支,也就是可以在一个分支内分支。请记住加入您创建的所有分支。

这意味着应该加入每个分支。为了连接来自分支的值,metaflow 提供了 merge_artifacts 效用函数来帮助传播明确的值。

由于工作流中有三个分支,因此添加了三个合并步骤来合并结果。

以下更改对我有用:-

from metaflow import FlowSpec,step

class TextProcessing(FlowSpec):

  @step
  def a(self):
    ....

    self.next(self.b,self.c)

  @step
  def c(self):
    result1 = {}

    ....

    self.next(self.merge_3)

  @step
  def b(self):
    ....

    self.next(self.d,self.e)

  @step
  def e(self):
    result2 = []
    .....

    self.next(self.merge_2)

  @step
  def d(self):
    ....

    self.next(self.f,self.g)

  @step
  def f(self):
    result3 = []
    ....

    self.next(self.merge_1)

  @step
  def g(self):
    result4 = []
    .....

    self.next(self.merge_1)

  @step
  def merge_1(self,results):
    self.result = {
      'result4' : results.g.result4,'result3' : results.f.result3
    }

    self.next(self.merge_2)

  @step
  def merge_2(self,results):
    self.result = { 'result2' : results.e.result2,**results.merge_1.result }
    self.merge_artifacts(results,include=['result'])
    self.next(self.merge_3)

  @step
  def merge_3(self,results):
    self.result = { 'c' : results.c.result1,**results.merge_2.result }
    self.merge_artifacts(results,include=['result'])
    self.next(self.end)

  @step
  def end(self):
    print(self.result)

etl = TextProcessing()