问题描述
我在同一网络中有3台服务器。在每个服务器上,redis服务和某种生产者正在运行。生产者将作业排队到名为tasks
的本地rq队列中。
因此,每个服务器都有自己的tasks
队列。
此外,还有另外一台服务器正在运行rq worker。可以让该工作人员检查3台服务器中每台服务器上的tasks
队列吗?
我尝试创建连接列表
import redis
from rq import Queue,Worker
from rq import push_connection
# urls = [url1,url2,url3]
connections = list(map(redis.from_url,urls))
然后我用它来创建队列列表。
queues = list(map(lambda c: Queue('tasks',connection=c),connections))
然后我推动所有连接
for connection in connections:
push_connection(connection)
并将队列传递到Worker
Worker(queues=queues).work()
这导致工作人员仅在上次推送的任何连接上监听tasks
。
我一直在研究rq上的代码,我想我可以编写一个自定义工作程序类来执行此操作,但是在我这样做之前,我想问一问是否还有其他方法。甚至可能完全是另一个排队框架?
解决方法
好的,我解决了这个问题。我仍然不确定我是否有权在此处发布实际的源代码,因此我将概述我的解决方案。
我必须覆盖register_birth(self)
,register_death(self)
和dequeue_job_and_maintain_ttl(self,timeout)
。这些功能的原始实现可以在here中找到。
register_birth
基本上,您必须遍历所有连接,push_connection(connection)
,完成注册过程,然后pop_connection()
。
请注意仅在mapping
变量中列出与该连接相对应的队列。原始实现使用queue_names(self)
获取队列名称列表。您只需要对相关队列做queue_names(self)
做的同样的事情。
register_death
基本上与register_birth
相同。遍历所有连接push_connection(connection)
,完成与原始实现和pop_connection()
相同的步骤。
dequeue_job_and_maintain_ttl
我们来看看original implementation of this function。在进入try
块之前,我们将希望所有内容保持不变。在这里,我们想无限循环地遍历所有连接。您可以使用itertools.cycle来做到这一点。
在循环push_connection(connection)
中,并将self.connection
设置为当前连接。如果缺少self.connection = connection
,则可能无法正确返回作业结果。
现在,我们将继续调用self.queue_class.dequeue_any
,类似于原始实现。但是我们将超时设置为1
,以便在当前连接没有可供该工人使用的工作的情况下,继续检查另一连接。
确保使用与当前连接相对应的队列列表调用self.queue_class.dequeue_any
。在这种情况下,queues
仅包含相关队列。
result = self.queue_class.dequeue_any(
queues,1,connection=connection,job_class=self.job_class)
然后pop_connection()
,然后对result
进行与原始实现相同的检查。如果result
不是None
,我们已经找到工作要做,需要break
跳出循环。
保留原始实施中的所有其他内容。不要忘记break
块末尾的try
。它打破了while True
循环。
另一件事
队列包含对其连接的引用。您可以使用它来创建(connection,queues)
的列表,其中queues
包含所有具有连接connection
的队列。
如果将结果列表传递给itertools.cycle,则会得到覆盖dequeue_job_and_maintain_ttl
所需的无穷迭代器。