在Django Startup上的ThreadPoolExecutor中运行异步工作

问题描述

我有一个全局对象,可以在视图中完美地工作:

class TaskQueue(object):
    executor: ThreadPoolExecutor = None

    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=settings.THREAD_POOL_SIZE)

    def compute_pending_experiments(self):
        experiments: QuerySet = Experiment.objects.filter(condition)
        for experiment in experiments:
            global_task_queue.add_experiment(experiment)

    @staticmethod
    def eval_experiment(experiment: Experiment) -> None:
        # Some stuff

    def add_experiment(self,experiment: Experiment):
        self.executor.submit(self.eval_experiment,experiment)

global_task_queue = TaskQueue()
global_task_queue.compute_pending_experiments()  # THIS line is the problem

问题是最后一行global_task_queue.compute_pending_experiments() 阻塞了 Asgi主线程,我不知道为什么,因为这些任务已提交给ThreadPoolExecutor,这应该是异步的。

这在开发服务器中有效,但在达芙妮中无效,因此我无法在生产中进行部署。 This solution使用钩子对我没有用,因为我需要数据库操作。

我尝试在一个线程中完成所有工作,但无论如何它都会阻塞主线程:

global_task_queue = TaskQueue()

def compute_pending_experiments():
    experiments: QuerySet = Experiment.objects.filter(condition)
    for experiment in experiments:
        global_task_queue.add_experiment(experiment)

threading.Thread(target=compute_pending_experiments).start()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)