Luigi Workflows 中的条件分支

问题描述

我是 luigi 的新手,并试图在我的流程中创建一个条件分支。分支任务会评估一个条件,并根据结果跳过它的一些子任务。

为了测试这一点,我只有一个虚拟任务,它检查当前小时,如果流程在早上执行,则返回 True,否则返回 False。此任务有两个子任务,一个在控制台中打印“Morning”,另一个打印“Afternoon”。根据分支任务的结果,一个被激活,另一个被跳过。这是在 Prefect 中的样子:

Prefect version

您可以在这里看到流程是在上午执行的,因此跳过了下午的任务。

经过一番研究,我不知道路易吉是否有能力做这样的事情。到目前为止我尝试过的是:

# Third Task
class Branch(luigi.Task):
    def requires(self):
        return Sleep()

    def output(self):
        return luigi.LocalTarget('condition.txt')

    def run(self):
        date = str(datetime.Now())
        hour = int (date.split()[1].split(':')[0])
        with self.output().open('w') as out:
            if hour<12:
                out.write('morning')
                open("afternoon.txt","w").close() # create afternoon Target skips afternoon task
            else:
                out.write('afternoon')
                open("morning.txt","w").close() # create morning Target skips afternoon task

# Fourth Task depends on result of branch
class Morning(luigi.Task):
    def requires(self):
        return Branch()

    def output(self):
        return luigi.LocalTarget('morning.txt')

    def run(self):
        print ("Morning")
        with self.output().open('w') as out:
            out.write("Morning")


class Afternoon(luigi.Task):
    def requires(self):
        return Branch()

    def output(self):
        return luigi.LocalTarget('afternoon.txt')

    def run(self):
        print ("Afternoon")
        with self.output().open('w') as out:
            out.write("Afternoon")

# Fifth task is a merge after the branch
class Merge(luigi.WrapperTask):
    def requires(self):
        yield Morning()
        yield Afternoon()

    def run(self):
        print ("Merged")

据我所知,luigi 只会在其输出尚不存在的情况下执行任务。我当时的想法是创建任务的输出文件来跳过以防止它执行,但它不起作用。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)