Rails作业因使用MQTT的Sidekiq错误而中断管道

问题描述

Sidekiq尝试处理Rails作业时出现错误。 我现在正在将网络应用迁移到Google Cloud Service,在那里我设置了Redis服务器和代理mqtt。

活动作业如下:

 def publish_update
topic = "tournament_team_category/#{@round_match.round.tournament_team_category.id}"
payload = {
  round_match: {
    id: @round_match.id,started_at: @round_match.started_at,ended_at: @round_match.ended_at,first_round_match_team: {
      id: @round_match.first_round_match_team.id,goals: @round_match.first_round_match_team.goals
    },second_round_match_team: {
      id: @round_match.second_round_match_team.id,goals: @round_match.second_round_match_team.goals
    }
  }
}

# Todo: Improve exception manage
begin
  MQTT_CLIENT.connect
  MQTT_CLIENT.publish(topic,payload.to_json,false,1)
  MQTT_CLIENT.disconnect
rescue MQTT::NotConnectedException => _e
  return nil
rescue MQTT::ProtocolException => _e
  return nil
end
end

Sidekiq尝试执行此作业时,服务器向我显示错误

2020-10-01 13:22:39 worker[20201001t140958]  2020-10-01T13:22:39.897Z 1 TID-gtixo1hip WARN: Errno::EPIPE: broken pipe
2020-10-01 13:22:39 worker[20201001t140958]  2020-10-01T13:22:39.897Z 1 TID-gtixo1hip WARN: /app/vendor/bundle/ruby/2.6.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:556:in `write'
2020-10-01 13:22:39 worker[20201001t140958]  /app/vendor/bundle/ruby/2.6.0/gems/mqtt-0.5.0/lib/mqtt/client.rb:556:in `block in send_packet'

mqtt client.rb第556行代码是:

# Send a packet to server
   def send_packet(data)
# Raise exception if we aren't connected
   raise MQTT::NotConnectedException if not connected?

# Only allow one thread to write to socket at a time
   @write_semaphore.synchronize do
   @socket.write(data.to_s)
 end
end

我找不到任何解决方案来解决此问题,而我的工作仍然陷入困境。 有人遇到过这种问题吗?

谢谢

解决方法

看起来您的MQTT客户端使用不是线程安全的,您需要这样做:

https://github.com/mperham/sidekiq/wiki/Advanced-Options#connection-pooling