问题描述
||
我的数据存储区中有大量实体(产品),这些实体(产品)来自外部数据源。我想每天检查一次更新。
由于应用程序直接获取了某些项目,因此这些项目已经更新。有些是新插入的,不需要更新。
对于尚未获取的文件,我正在运行cron作业。我使用Python API。
目前,我执行以下操作。
我有一个领域
dateupdated = db.DateTimeProperty(auto_Now_add=True)
然后我可以使用
query = dbmodel.product.all()
query.filter(\'dateupdated <\',newdate)
query.order(\'dateupdated\')
results = query.fetch(limit=mylimit,offset=myoffset)
选择最旧的条目并安排它们进行更新。我使用带有自定义任务名称的任务队列来确保每个产品更新每天仅运行一次。
问题是,我需要更新字段dateupdated,这意味着即使没有更改产品数据,也要写数据存储区,只是为了跟踪更新过程。
这会消耗大量资源(cpu时间,Datastore API调用等)。
有没有更好的方法来执行此类任务并避免不必要的数据存储写入?
解决方法
是的,使用游标
通过按ѭ2排序查询,然后在处理完实体后存储游标,可以稍后重新运行同一查询,以仅获取上次查询后更新的项目。
所以,给一个像
class MyEntity(db.model):
dateupdated = db.DateTimeProperty(auto_now_add=True)
您可以将处理程序设置为作为任务运行,例如:
class ProcessNewEntities(webapp.RequestHandler):
def get(self):
\"\"\"Run via a task to process batches of \'batch_size\'
recently updated entities\"\"\"
# number of eneities to process per task execution
batch_size = 100
# build the basic query
q = MyEntity.all().order(\"dateupdated\")
# use a cursor?
cursor = self.request.get(\"cursor\")
if cursor:
q.with_cursor(cursor)
# fetch the batch
entities = q.fetch(batch_size)
for entity in entities:
# process the entity
do_your_processing(entity)
# queue up the next task to process the next 100
# if we have no more to process then delay this task
# for a while so that it doesn\'t hog the application
delay = 600 if len(entities)<batch_size else 0
taskqueue.add(
url=\'/tasks/process_new_entities\',params={\'cursor\': q.cursor()},countdown=delay)
然后只需触发任务执行的开始,例如:
def start_processing_entities():
taskqueue.add(url=\'/tasks/process_new_entities\')