体系结构建议-如何实现自动缩放的异步任务

问题描述

我们有一个大型应用程序,它使用django作为ORM,并使用celery作为运行基础结构的任务。 我们运行由事件(用户驱动或自动)触发的复杂管道,如下所示:


def pipeline_a:
# all lines are synchronous,so second line must happen after first is finished successfully
first_res = a1()
all_results = in_parallel.do(a2,a3,a4)
a5(first_res,all_results)

我们希望在不同的机器上运行a1,a2,...(每个任务可能需要不同的资源),并且并行运行管道的数量始终在变化。 今天,我们使用celery来实现上述功能非常方便-但不适合自动缩放(我们砍掉了celery以使其与kubernetes一起使用,但是它没有本机支持)。

主要要解决的问题是:

  1. 仅在完成所有先前步骤之后才如何“运行下一个管道步骤”(我可能事先不知道将运行哪些步骤-这取决于先前步骤的结果,因此这些步骤本质上是动态的)
  2. 今天,我们尝试使用kubernetes(EKS)自动缩放某些任务(SQS队列大小是hpa指标)。如何使kubernetes不尝试终止当前正在运行的任务,但是如果有新任务到达队列(仍然要花费大约半小时才能完成),仍然可以“启动pods”

到目前为止,我的经验表明,解决1芹菜是最方便的方法,但是与2冲突。那么,如何在没有芹菜的情况下解决1,然后如何利用kubernetes来解决长期运行的任务?>

解决方法

如果我正确理解了您的问题,

  • 您有一个异步作业,该作业最多可以运行30分钟。
  • 作业在K8上运行。
  • 当前作业的输出可以决定下一个作业。
  • 您具有使用SQS的能力。

您可以为每个任务维护队列。为每个队列实现一个使用者。首先使用Django将任务添加到“ a1”。更新数据库中的作业状态。

a1的使用者完成执行后,将更新db中的状态并推送到右队列。假设为“ a3”。 消费者“ a3”将阅读该任务。更新数据库。执行。将任务推入正确的队列。更新数据库。

如果使用SQS,则将无限任务存储在队列中。您将必须根据SQS队列的大小来增加使用方数量。为此,您可以使用https://github.com/Wattpad/kube-sqs-autoscaler

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...