具有多个连接的Rq Worker register_birth register_death dequeue_job_and_maintain_ttl 另一件事

问题描述

我在同一网络中有3台服务器。在每个服务器上,redis服务和某种生产者正在运行。生产者将作业排队到名为tasks的本地rq队列中。 因此,每个服务器都有自己的tasks队列。

此外,还有另外一台服务器正在运行rq worker。可以让该工作人员检查3台服务器中每台服务器上的tasks队列吗?

我尝试创建连接列表

import redis
from rq import Queue,Worker
from rq import push_connection
# urls = [url1,url2,url3]
connections = list(map(redis.from_url,urls))

然后我用它来创建队列列表。

queues = list(map(lambda c: Queue('tasks',connection=c),connections))

然后我推动所有连接

for connection in connections:
    push_connection(connection)

并将队列传递到Worker

Worker(queues=queues).work()

这导致工作人员仅在上次推送的任何连接上监听tasks

我一直在研究rq上的代码,我想我可以编写一个自定义工作程序类来执行此操作,但是在我这样做之前,我想问一问是否还有其他方法。甚至可能完全是另一个排队框架?

解决方法

好的,我解决了这个问题。我仍然不确定我是否有权在此处发布实际的源代码,因此我将概述我的解决方案。

我必须覆盖register_birth(self)register_death(self)dequeue_job_and_maintain_ttl(self,timeout)。这些功能的原始实现可以在here中找到。

register_birth

基本上,您必须遍历所有连接,push_connection(connection),完成注册过程,然后pop_connection()

请注意仅在mapping变量中列出与该连接相对应的队列。原始实现使用queue_names(self)获取队列名称列表。您只需要对相关队列做queue_names(self)做的同样的事情。

register_death

基本上与register_birth相同。遍历所有连接push_connection(connection),完成与原始实现和pop_connection()相同的步骤。

dequeue_job_and_maintain_ttl

我们来看看original implementation of this function。在进入try块之前,我们将希望所有内容保持不变。在这里,我们想无限循环地遍历所有连接。您可以使用itertools.cycle来做到这一点。

在循环push_connection(connection)中,并将self.connection设置为当前连接。如果缺少self.connection = connection,则可能无法正确返回作业结果。

现在,我们将继续调用self.queue_class.dequeue_any,类似于原始实现。但是我们将超时设置为1,以便在当前连接没有可供该工人使用的工作的情况下,继续检查另一连接。

确保使用与当前连接相对应的队列列表调用self.queue_class.dequeue_any。在这种情况下,queues仅包含相关队列。

result = self.queue_class.dequeue_any(
    queues,1,connection=connection,job_class=self.job_class)

然后pop_connection(),然后对result进行与原始实现相同的检查。如果result不是None,我们已经找到工作要做,需要break跳出循环。

保留原始实施中的所有其他内容。不要忘记break块末尾的try。它打破了while True循环。

另一件事

队列包含对其连接的引用。您可以使用它来创建(connection,queues)的列表,其中queues包含所有具有连接connection的队列。

如果将结果列表传递给itertools.cycle,则会得到覆盖dequeue_job_and_maintain_ttl所需的无穷迭代器。