pymongo:无法使用存储在 Redis 中的恢复令牌

问题描述

我正在尝试使用 pymongo 来侦听更改流,同时对故障/重新启动具有弹性。因此,我将恢复令牌存储在 Redis 中以备后用。

这是我试过的代码

# Skipping pymongo and redis initialisation

if redis.exists('resumetoken'):
   resume_token = pickle.loads(redis.get('resumetoken'))
else:
   resume_token = None
stream = db.my_collection.watch(
            pipeline=[{'$match': {'fullDocument.type': 'something'}}],resume_after=resume_token,full_document='updateLookup',max_await_time_ms=500
        )
change = stream.next()
redis.set('resumetoken',pickle.dumps(change['_id']))
print(change)

第一次运行时,会打印一个更改,然后我将一些内容存储在 Redis 中。

但是如果我重新运行,就会出现以下错误

pymongo.errors.OperationFailure: The resume token UUID does not exist. Has the collection been dropped?

我使用的是 MongoDB 3.6.1 和 pymongo 3.7.1-1.1。

有人知道为什么我的简历令牌被拒绝了吗?

解决方法

首先,这不是用于恢复更改流的正确模式。请参阅 https://docs.mongodb.com/ruby-driver/master/tutorials/ruby-driver-change-streams/#resuming-a-change-stream,根据需要适应 Python 或查看 pymongo 文档以获取等效内容。

其次,resume token 是一个文档。如果天真地pickle 和unpickling 不起作用,首先将其转换为本地Python 数据类型(例如序列化为扩展的JSON 文档),然后pickle/unpickle 该序列化。恢复时您需要重新创建正确类型的实例。

第三,确认在你阅读了你写给 redis 的东西后,你得到的正是你写的东西。 Redis 不是一个通用的持久化系统。