ruby – 订阅队列,接收1条消息,然后取消订阅

我有一个场景,我需要非常快速地分配和处理作业.我将在队列中快速填充大约45个作业,我可以同时处理大约20个(5台机器,每个4个核心).每个作业需要花费不同的时间,并且使事情变得复杂,垃圾收集是一个问题所以我需要能够让消费者离线进行垃圾收集.

目前,我有一切与pop一起工作(每个消费者每5ms弹出一次).这似乎是不合需要的,因为它转换为每秒600次popm请求到rabbitmq.

我很乐意,如果有一个pop命令会像订阅一样,但只有一条消息. (进程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)

我试图欺骗AMQP gem来做这样的事情,但它不起作用:在队列为空并且不再向消费者发送消息之前,我似乎无法取消订阅.其他取消订阅方法我担心会失去信息.

consume_1.rb:

require "amqp"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost",:user => "guest",:pass => "guest",:vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks",:auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
    queue.unsubscribe { puts "unbound" }
    sleep 3
  end
end

consumer_many.rb:

require "amqp"

# Imagine the command is something cpu - intensive like image processing.
command = "sleep 0.1"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost",:auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
  end
end

producer.rb:

require "amqp"

i = 0
EventMachine.run do
  connection = AMQP.connect(:host => "localhost",:auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  EM.add_periodic_timer(1) do
    msg = "Message #{i}"
    i+=1
    puts "~ publishing #{msg}"
  end
end

我将启动consume_many.rb和producer.rb.消息将按预期流动.

当我启动consume_1.rb时,它会获取所有其他消息(如预期的那样).但它永远不会取消订阅,因为它永远不会完成处理它的所有消息……所以就这样了.

我如何让consume_1.rb订阅队列,获取单个消息,然后将自己从负载均衡器环中取出,这样它就可以完成它的工作,而不会丢失可能在队列中的任何其他待处理作业否则会被安排发送到这个过程?

蒂姆

解决方法

这是AMQP gem的一个简单但很难记录的功能,你需要的是:

在您的消费者中:

channel = AMQP::Channel.new(connection,:prefetch => 1)

然后使用您的订阅块,执行:

queue.subscribe(:ack => true) do |queue_header,payload|
  puts "Received a message: #{payload}."
  # Do long running work here

  # AckNowledge message
  queue_header.ack
end

这是什么告诉RabbitMQ一次只发送你的消费者1消息,而不是发送另一条消息,直到你在长时间运行一个长时间运行的任务后调用队列头上的ack.

我可能需要对此进行纠正,但我相信直接交换会更适合这项任务.

相关文章

validates:conclusion,:presence=>true,:inclusion=>{...
一、redis集群搭建redis3.0以前,提供了Sentinel工具来监控各...
分享一下我老师大神的人工智能教程。零基础!通俗易懂!风趣...
上一篇博文 ruby传参之引用类型 里边定义了一个方法名 mo...
一编程与编程语言 什么是编程语言? 能够被计算机所识别的表...
Ruby类和对象Ruby是一种完美的面向对象编程语言。面向对象编...