处理并发ruby线程池中的异常

如何处理并发 ruby线程池中的异常( http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html)?

例:

pool = Concurrent::FixedThreadPool.new(5) 
pool.post do
  raise 'something goes wrong'
end

# how to rescue this exception here

更新:

这是我的代码的简化版本:

def process
  pool = Concurrent::FixedThreadPool.new(5)

  products.each do |product|
    new_product = generate_new_product

    pool.post do
      store_in_db(new_product) # here exception is raised,e.g. connection to db Failed
    end
  end

  pool.shutdown
  pool.wait_for_terminaton
end

所以我想要实现的是在任何异常的情况下停止处理(中断循环).

此异常也在更高级别的应用程序中获救,并且执行了一些清理作业(例如将模型状态设置为失败并发送一些通知).

解决方法

以下答案来自jdantonio,来自 https://github.com/ruby-concurrency/concurrent-ruby/issues/616


大多数应用程序不应直接使用线程池.线程池是一种用于内部使用的低级抽象.此库中的所有高级抽象(Promise,Actor等)都将作业发布到全局线程池,并且都提供异常处理.只需选择最适合您的用例的抽象并使用它.

如果您觉得需要配置自己的线程池而不是使用全局线程池,您仍然可以使用高级抽象.它们都支持:executor选项,允许您注入自定义线程池.然后,您可以使用高级抽象提供的异常处理.

如果你绝对坚持将作业直接发布到线程池而不是使用我们的高级抽象(我强烈反对),那么只需创建一个作业包装器.您可以在我们所有的高级抽象中找到作业包装器的示例,Rails ActiveJob,Sucker Punch和其他使用我们的线程池的库.“

那么使用Promises实现怎么样?
http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html
在你的情况下,它看起来像这样:

promises = []
products.each do |product|
  new_product = generate_new_prodcut

  promises << Concurrent::Promise.execute do 
    store_in_db(new_product)
  end
end

# .value will wait for the Thread to finish.
# The ! means,that all exceptions will be propagated to the main thread
# .zip will make one Promise which contains all other promises.
Concurrent::Promise.zip(*promises).value!

相关文章

validates:conclusion,:presence=>true,:inclusion=>{...
一、redis集群搭建redis3.0以前,提供了Sentinel工具来监控各...
分享一下我老师大神的人工智能教程。零基础!通俗易懂!风趣...
上一篇博文 ruby传参之引用类型 里边定义了一个方法名 mo...
一编程与编程语言 什么是编程语言? 能够被计算机所识别的表...
Ruby类和对象Ruby是一种完美的面向对象编程语言。面向对象编...