在执行具有许多子工作流程/活动的工作流程时,会频繁抛出WorkflowRejectedExecutionError

问题描述

我正在评估使用Cadence执行长时间运行的批量操作。我有以下(科特琳)代码:

class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {

    private val changeNamePromises = mutableListOf<Promise<ChangeNameResult>>()

    override fun updateNames(newName: String,entityIds: Collection<String>) {
        entityIds.forEach { entityId ->
            val childWorkflow = Workflow.newChildWorkflowStub(
                    UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
            )
            val promise = Async.function(childWorkflow::setName,newName,entityId)

            changeNamePromises.add(promise)
        }

        val allDone = Promise.allOf(changeNamePromises)
        allDone.get()
    }

    class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
        override fun setName(newName: String,entityId: String): SetNameResult {
            return Async.function(activities::setName,entityId).get()
        }
    }
}

这对于数量较少的实体很好,但是我很快遇到了以下异常:

java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038,RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:229) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:76) ~[cadence-client-2.7.4.jar:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f17a605[Not completed,task = java.util.concurrent.Executors$RunnableAdapter@7fa9f240[Wrapped task = com.uber.cadence.internal.sync.WorkflowThreadImpl$RunnableWrapper@1a27000b]] rejected from java.util.concurrent.ThreadPoolExecutor@22188bd0[Running,pool size = 600,active threads = 600,queued tasks = 0,completed tasks = 2400]
    at com.uber.cadence.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:281) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.sync.AsyncInternal.execute(AsyncInternal.java:300) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.sync.AsyncInternal.function(AsyncInternal.java:111) ~[cadence-client-2.7.4.jar:na]
...

看来我很快就耗尽了线程池,而Cadence无法安排新任务。

我已经通过将updateNames的定义更改为:

来解决此问题。
    override fun updateNames(newName: String,entityIds: Collection<String>) {

        entityIds.chunked(200).forEach { sublist ->
            val promises = sublist.map { entityId ->
                val childWorkflow = Workflow.newChildWorkflowStub(
                        UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
                )
                Async.function(childWorkflow::setName,entityId)
            }

            val allDone = Promise.allOf(promises)
            allDone.get()
        }
    }

这基本上处理了200个块中的项目,并等待每个块完成,然后再移动到下一个。我担心它的执行效果(重试时,块中的单个错误将停止处理以下块中的所有记录)。我还担心在崩溃时Cadence能够恢复该功能的进度。

我的问题是:是否有一种惯用的Cadence方法做到这一点,而不会立即导致资源枯竭?我使用的是错误的技术还是仅仅是天真的方法?

解决方法

Cadence工作流程对单个工作流程运行的大小具有相对较小的限制。它随着并行工作流运行的数量而扩展。因此,在一个工作流程中执行大量任务是一种反模式。

惯用的解决方案是:

  • 运行有限大小的钱箱,然后以新号码继续通话。这样就限制了单个运行大小。
  • 使用分层工作流程。单亲有1000个孩子,每个孩子执行1k个活动,则可以执行100万个活动,从而使每个工作流历史记录的大小受限。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...