码头工人 – 由于Luigi工作分布不均而导致工人死亡(2.6.1)

我们正在尝试运行分布在docker swarm集群上的简单管道. luigi worker被部署为复制的docker服务.他们成功启动,在向luigi-server请求工作几秒钟之后,由于没有为他们分配工作而他们开始死亡,所有任务最终都分配给一个工作人员.

我们不得不在我们工人的luigi.cfg中设置keep_alive = True来强迫他们不要死,但是在管道完成后让工人保持身边似乎是一个坏主意.
有没有办法控制工作分配?

我们的测试管道:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i,self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')


class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)

    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd,stderr=subprocess.STDOUT,shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))


if __name__ == "__main__":
    luigi.run()
最佳答案
您的问题是一次产生一个要求的结果,而您希望立即产生所有这些要求,如下所示:

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i,self.sleep_time))
    yield reqs

相关文章

最近一直在开发Apworks框架的案例代码,同时也在一起修复Apw...
最近每天都在空闲时间努力编写Apworks框架的案例代码WeText。...
在《Kubernetes中分布式存储Rook-Ceph部署快速演练》文章中,...
最近在项目中有涉及到Kubernetes的分布式存储部分的内容,也...
CentOS下Docker与.netcore(一) 之 安装 CentOS下Docker与.ne...
CentOS下Docker与.netcore(一) 之 安装 CentOS下Docker与.ne...