通过 Celery 任务调用 DRF ViewSet

问题描述

我有一个 Django Rest Framework ViewSet:

MyModelViewSet(generics.RetrieveUpdateDestroyAPIView):
    def perform_destroy(self,instance):
        # do something besides deleting the object

现在我正在编写一个 Celery 周期性任务,它根据过滤器(假设 end_date < Now删除过期对象。

我希望任务重用并执行在 ViewSet 的 perform_destroy 方法中执行的相同操作。

能做到吗?怎么样?
谢谢!

解决方法

您可以通过在 DRF 中使用 Request 来解决您的问题,安排一个 celery 任务来调用请求。效果很好,我以前实现过。

示例代码:

from rest_framework.request import Request as DRFRequest
from django.conf import settings
from django.http import HttpRequest
from your_module.views import MyModelViewSet

CELERY_CACHING_QUEUE = getattr(settings,"CELERY_CACHING_QUEUE",None)


def delete_resource(resource_pk: int) -> None:
    """
    This method helps to delete the resource by the id.
    """
    print(f'Starting deleting resource {resource_pk}...')

    request = HttpRequest()
    request.method = 'DELETE'
    request.META = {
        'SERVER_NAME': settings.ALLOWED_HOSTS[0],'SERVER_PORT': 443
    }
    drf_request = DRFRequest(request)

    # If your API need user has access permission,# you should handle for getting the value of
    # user_has_access_permission before
    # E.g. below
    # drf_request.user = user_has_access_permission

    try:
        view = MyModelViewSet(
            kwargs={
                'pk': resource_pk
            },request=drf_request
        )
        view.initial(drf_request)
        view.delete(drf_request)
    except (Exception,KeyError) as e:
        print(f'Cannot delete resource: {resource_pk},error: {e}')
        return

    print(f'Finished deleting resource {resource_pk}...')


@task(name="delete_resource_task",queue=CELERY_CACHING_QUEUE)
def delete_resource_task(resource_pk: int) -> None:
    """
    Async task helps to delete resource.
    """
    delete_resource(resource_pk)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...