问题描述
我正在编写一个简单的类,用于使用 expiringdict 缓存来自具有生存时间的用户的消息。为了与电报交互,我使用 pyTelegramBotAPI。缓存类:
import threading
from expiringdict import ExpiringDict
class SmartCacher:
def __init__(self,max_cache_len: int=100,max_cache_time_to_live: int=60):
self._rlock = threading.RLock()
self._cache = ExpiringDict(max_len=max_cache_len,max_age_seconds=max_cache_time_to_live)
self._bot = None
self._message_obj = None
self._message_text = None
PREFIX_SINGLE_UNIT_CH_RU = "ch_ru_single_"
PREFIX_SINGLE_UNIT_IN_PSEUDO_BUNCH_CH_RU = "ch_ru_pseudo_bunch_single_"
PREFIX_SINGLE_UNIT_IN_MUL_TRAN_CH_RU = "ch_ru_mul_tran_single_"
PREFIX_SINGLE_UNIT_RU_CH = "ru_ch_single_"
PREFIX_SINGLE_UNIT_IN_MUL_TRAN_RU_CH = "ru_ch_mul_tran_single_"
def init_basic_attrs(self,bot,message) -> None:
self._bot = bot
self._message_obj = message
self._message_text = message.text
def add_response_msg_to_cache(self,initial_user_message: str,val: str,type_='single',dir='CH-RU') -> None:
if dir == 'CH-RU':
usr_tg_id = self._message_obj.chat.id
if type_ == 'single':
key_with_prefixes = f"{usr_tg_id}_{self.PREFIX_SINGLE_UNIT_CH_RU}{initial_user_message}"
with self._rlock:
self._cache[key_with_prefixes] = val
elif type_ == 'pseudo_bunch_single':
prefixes_for_key = f"{usr_tg_id}_{self.PREFIX_SINGLE_UNIT_IN_PSEUDO_BUNCH_CH_RU}"
key_with_prefixes = f"{prefixes_for_key}{initial_user_message}"
with self._rlock:
self._cache[key_with_prefixes] = val
else:
prefixes_for_key = f"{usr_tg_id}_{self.PREFIX_SINGLE_UNIT_IN_MUL_TRAN_CH_RU}"
key_with_prefixes = f"{prefixes_for_key}{initial_user_message}"
with self._rlock:
self._cache[key_with_prefixes] = val
else:
assert type_ != 'pseudo_bunch_single'
if type_ == 'single':
prefixes_for_key = f"{usr_tg_id}_{self.PREFIX_SINGLE_UNIT_RU_CH}"
key_with_prefixes = f"{prefixes_for_key}{initial_user_message}"
with self._rlock:
self._cache[key_with_prefixes] = val
else:
prefixes_for_key = f"{usr_tg_id}_{self.PREFIX_SINGLE_UNIT_IN_MUL_TRAN_RU_CH}"
key_with_prefixes = f"{prefixes_for_key}{initial_user_message}"
with self._rlock:
self._cache[key_with_prefixes] =val
def get_from_cache_if_exists(self,key: str):
try:
return self._cache[key]
except KeyError:
return False
每个用户都应该有自己的缓存。 我的问题是“我是否需要使用 threading.RLock() 来确保在这种情况下我不会在线程之间发生冲突?” 例如这里:
key_with_postfix = self.PREFIX_SINGLE_UNIT_CH_RU + initial_user_message
self._cache[key_with_prefix] = val
我在 handlers(funcs) 之外创建了我的 SmartCacher 实例,这意味着所有线程都有一个共同的缓存对象。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)