问题描述
所以...为了响应 API 调用,我会这样做:
i = CertainObject(paramA=1,paramB=2)
i.save()
现在我的作者数据库有了新记录。
处理可能需要一点时间,我不希望推迟对 API 调用者的响应,所以下一行我将使用 Celery 将对象 ID 传输到异步作业:
run_async_job.delay(i.id)
立即或几秒钟后,取决于队列 run_async_job
尝试从提供该 ID 的数据库加载记录。这是一场赌博。有时有效,有时不取决于只读副本是否更新。
是否有一种模式可以保证成功并且不必在阅读前“睡觉”几秒钟或希望好运?
谢谢。
解决方法
最简单的方法似乎是使用 Greg 和 Elrond 在他们的回答中提到的重试。如果您使用的是 shared_task 或 @app.task 装饰器,则可以使用以下代码片段。
@shared_task(bind=True)
def your_task(self,certain_object_id):
try:
certain_obj = CertainObject.objects.get(id=certain_object_id)
# Do your stuff
except CertainObject.DoesNotExist as e:
self.retry(exc=e,countdown=2 ** self.request.retries,max_retries=20)
我在每次重试之间使用了指数倒计时。您可以根据自己的需要进行修改。
您可以找到自定义重试延迟 here 的文档。 还有另一个文档解释了这个 link
中的指数退避当您调用 retry 时,它将使用相同的任务 ID 发送一条新消息,并确保将消息传递到与原始任务相同的队列。您可以在文档 here
中阅读更多相关信息 ,因为写入然后立即加载它是一个高优先级,那么为什么不将它存储在基于内存的数据库中,如 Memcache 或 Redis。因此,一段时间后,您可以使用 celery 中的定期作业将其写入数据库,该作业将每分钟运行一次。写入数据库完成后,它会从Redis/Memcache中删除键。
您可以将数据保存在基于内存的数据库中一段时间,比如最需要数据的 1 小时。你也可以创建一个服务方法,它会检查数据是否在内存中。
Django Redis 是连接 redis 的绝佳软件包(如果您在 Celery 中将其用作代理)。
我提供了一些基于 Django 缓存的示例:
# service method
from django.core.cache import cache
def get_object(obj_id,model_cls):
obj_dict = cache.get(obj_id,None) # checks if obj id is in cache,O(1) complexity
if obj_dict:
return model_cls(**obj_dict)
else:
return model_cls.objects.get(id=obj_id)
# celery job
@app.task
def store_objects():
logger.info("-"*25)
# you can use .bulk_create() to reduce DB hits and faster DB entries
for obj_id in cache.keys("foo_*"):
CertainObject.objects.create(**cache.get(obj_id))
cache.delete(obj_id)
logger.info("-"*25)
,
最简单的解决方案是捕获在任务开始时抛出的任何 DoesNotExist
错误,然后安排重试。这可以通过将 run_async_job
转换为 Bound Task
来完成:
@app.task(bind=True)
def run_async_job(self,object_id):
try:
instance = CertainObject.objects.get(id=object_id)
except CertainObject.DoesNotExist:
return self.retry(object_id)
,
本文深入探讨了如何处理复制数据库的先写后读问题:https://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read-from-replicas-58cc43973638。
像作者一样,我知道没有万无一失的万能方法来处理写后读不一致的问题。
我之前使用的主要策略是使用某种 expect_and_get(pk,max_attempts=10,delay_seconds=5)
方法尝试获取记录,并尝试 max_attempts
次,两次尝试之间延迟 delay_seconds
秒.这个想法是它“期望”记录存在,因此它将一定数量的失败视为只是暂时的数据库问题。它比仅仅休眠一段时间要可靠一些,因为它可以更快地获取记录并希望减少延迟作业执行的频率。
另一种策略是延迟从特殊的 save_to_read
方法返回,直到只读副本具有该值,或者通过以某种方式同步推送新值到只读副本,或者只是轮询它们直到它们返回记录。这种方式在 IMO 看来有点 hackier。
对于大量读取,您可能不必担心写后读一致性:
如果我们渲染用户所属企业的名称,如果管理员更改它的极少数情况下,需要一分钟时间才能将更改传播到企业的名称,那么这真的没什么大不了的用户。