问题描述
我们有一个大型应用程序,它使用django作为ORM,并使用celery作为运行基础结构的任务。 我们运行由事件(用户驱动或自动)触发的复杂管道,如下所示:
def pipeline_a:
# all lines are synchronous,so second line must happen after first is finished successfully
first_res = a1()
all_results = in_parallel.do(a2,a3,a4)
a5(first_res,all_results)
我们希望在不同的机器上运行a1,a2,...(每个任务可能需要不同的资源),并且并行运行管道的数量始终在变化。 今天,我们使用celery来实现上述功能非常方便-但不适合自动缩放(我们砍掉了celery以使其与kubernetes一起使用,但是它没有本机支持)。
主要要解决的问题是:
- 仅在完成所有先前步骤之后才如何“运行下一个管道步骤”(我可能事先不知道将运行哪些步骤-这取决于先前步骤的结果,因此这些步骤本质上是动态的)
- 今天,我们尝试使用kubernetes(EKS)自动缩放某些任务(SQS队列大小是hpa指标)。如何使kubernetes不尝试终止当前正在运行的任务,但是如果有新任务到达队列(仍然要花费大约半小时才能完成),仍然可以“启动pods”
到目前为止,我的经验表明,解决1芹菜是最方便的方法,但是与2冲突。那么,如何在没有芹菜的情况下解决1,然后如何利用kubernetes来解决长期运行的任务?>
解决方法
如果我正确理解了您的问题,
- 您有一个异步作业,该作业最多可以运行30分钟。
- 作业在K8上运行。
- 当前作业的输出可以决定下一个作业。
- 您具有使用SQS的能力。
您可以为每个任务维护队列。为每个队列实现一个使用者。首先使用Django将任务添加到“ a1”。更新数据库中的作业状态。
a1的使用者完成执行后,将更新db中的状态并推送到右队列。假设为“ a3”。 消费者“ a3”将阅读该任务。更新数据库。执行。将任务推入正确的队列。更新数据库。
如果使用SQS,则将无限任务存储在队列中。您将必须根据SQS队列的大小来增加使用方数量。为此,您可以使用https://github.com/Wattpad/kube-sqs-autoscaler