ruby – 我是否以正确的方式使用eventmachine?

我使用 ruby-smpp和redis来实现基于队列的后台工作程序来发送SMPP消息.

我想知道我是否以正确的方式使用eventmachine.它有效,但感觉不对.

#!/usr/bin/env ruby

# Sample SMS gateway that can receive MOs (mobile originated messages) and
# DRs (delivery reports),and send MTs (mobile terminated messages).
# MTs are,in the name of simplicity,entered on the command line in the format
# <sender> <receiver> <message body>
# MOs and DRs will be dumped to standard out.

require 'smpp'
require 'redis/connection/hiredis'
require 'redis'
require 'yajl'
require 'time'

LOGFILE = File.dirname(__FILE__) + "/sms_gateway.log"
PIDFILE = File.dirname(__FILE__) + '/worker_test.pid'
Smpp::Base.logger = Logger.new(LOGFILE)
#Smpp::Base.logger.level = Logger::WARN

REdis = Redis.new

class MbloxGateway

  # MT id counter. 
  @@mt_id = 0

  # expose SMPP transceiver's send_mt method
  def self.send_mt(sender,receiver,body)

    if sender =~ /[a-z]+/i
      source_addr_ton = 5
    else
      source_addr_ton = 2
    end

    @@mt_id += 1
    @@tx.send_mt(('smpp' + @@mt_id.to_s),sender,body,{
      :source_addr_ton => source_addr_ton
    #   :service_type => 1,#   :source_addr_ton => 5,#   :source_addr_npi => 0,#   :dest_addr_ton => 2,#   :dest_addr_npi => 1,#   :esm_class => 3,#   :protocol_id => 0,#   :priority_flag => 0,#   :schedule_delivery_time => nil,#   :validity_period => nil,#   :registered_delivery=> 1,#   :replace_if_present_flag => 0,#   :data_coding => 0,#   :sm_default_msg_id => 0 
    #     
    })
  end

  def logger
    Smpp::Base.logger
  end

  def start(config)
    # Write this workers pid to a file
    File.open(PIDFILE,'w') { |f| f << Process.pid }
    # The transceiver sends MT messages to the SMSC. It needs a storage with Hash-like
    # semantics to map SMSC message IDs to your own message IDs.
    pdr_storage = {} 

    # Run EventMachine in loop so we can reconnect when the SMSC drops our connection.
    loop do
      EventMachine::run do             
        @@tx = EventMachine::connect(
          config[:host],config[:port],Smpp::Transceiver,config,self    # delegate that will receive callbacks on MOs and DRs and other events
        )

      # Let the connection start before we check for messages
      EM.add_timer(3) do
        # Maybe there is some better way to do this. IDK,But it works!
        EM.defer do
          loop do
            # Pop a message
            message = REdis.lpop 'messages:send:queue'
            if message # If there is a message. Process it and check the queue again
              message = Yajl::Parser.parse(message,:check_utf8 => false) # Parse the message from Json to Ruby hash
              if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.Now)

                self.class.send_mt(message['sender'],message['receiver'],message['body']) # Send the message
                REdis.publish 'log:messages',"#{message['sender']} -> #{message['receiver']}: #{message['body']}" # Push the message to the redis queue so we can listen to the channel
              else
                REdis.lpush 'messages:queue',Yajl::Encoder.encode(message)
              end
            else # If there is no message. Sleep for a second
              sleep 1
            end
          end
        end
      end
    end
      sleep 2
    end
  end

  # ruby-smpp delegate methods 

  def mo_received(transceiver,pdu)
    logger.info "Delegate: mo_received: from #{pdu.source_addr} to #{pdu.destination_addr}: #{pdu.short_message}"
  end

  def delivery_report_received(transceiver,pdu)
    logger.info "Delegate: delivery_report_received: ref #{pdu.msg_reference} stat #{pdu.stat}"
  end

  def message_accepted(transceiver,mt_message_id,pdu)
    logger.info "Delegate: message_accepted: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
  end

  def message_rejected(transceiver,pdu)
    logger.info "Delegate: message_rejected: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
  end

  def bound(transceiver)
    logger.info "Delegate: transceiver bound"
  end

  def unbound(transceiver)  
    logger.info "Delegate: transceiver unbound"
    EventMachine::stop_event_loop
  end

end

# Start the Gateway
begin   
  puts "Starting SMS Gateway. Please check the log at #{LOGFILE}"  

  # SMPP properties. These parameters work well with the Logica SMPP simulator.
  # Consult the SMPP spec or your mobile operator for the correct settings of 
  # the other properties.
  config = {
    :host => 'server.com',:port => 3217,:system_id => 'user',:password => 'password',:system_type => 'type',# default given according to SMPP 3.4 Spec
    :interface_version => 52,:source_ton  => 0,:source_npi => 1,:destination_ton => 1,:destination_npi => 1,:source_address_range => '',:destination_address_range => '',:enquire_link_delay_secs => 10
  }  
  gw = MbloxGateway.new
  gw.start(config)

rescue Exception => ex
  puts "Exception in SMS Gateway: #{ex} at #{ex.backtrace.join("\n")}"
end

解决方法

一些简单的步骤使这个代码更多EventMachine-ish:

>摆脱阻塞的Redis驱动程序,使用em-hiredis
>停止使用延迟.使用Redis驱动程序将工作推送到线程将使事情变得更糟,因为它依赖于它正在使用的套接字周围的锁.
>摆脱add_timer(3)
>摆脱内部循环,通过使用EM.next_tick重新安排下一个事件循环的块来替换它.外部有点不必要.您也不应该在EM.run周围循环,通过调用@@ tx.reconnect,通过在未绑定方法中重新连接而不是停止并重新启动事件循环来正确处理断开连接更清晰.
>不要睡觉,等一下. EventMachine会告诉您何时网络套接字上有新内容.

以下是EventMachine的核心代码如何与一些改进相似:

def start(config)
  File.open(PIDFILE,'w') { |f| f << Process.pid }
  pdr_storage = {} 

  EventMachine::run do
    @@tx = EventMachine::connect(
      config[:host],self
    )
    REdis = EM::Hiredis.connect

    pop_message = lambda do
      REdis.lpop 'messages:send:queue' do |message|
        if message # If there is a message. Process it and check the queue again
          message = Yajl::Parser.parse(message,:check_utf8 => false) # Parse the message from Json to Ruby hash
          if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.Now)
            self.class.send_mt(message['sender'],message['body'])
            REdis.publish 'log:messages',"#{message['sender']} -> #{message['receiver']}: #{message['body']}"
          else
            REdis.lpush 'messages:queue',Yajl::Encoder.encode(message)
          end
        end
        EM.next_tick &pop_message
      end
    end
  end
end

不完美,也可以使用一些清理,但这更像是以EventMachine方式应该是这样.没有睡眠,尽可能避免使用延迟,并且不使用可能阻塞的网络驱动程序,通过在下一个反应器循环上重新安排事物来实现传统循环.就Redis而言,差异并不是那么大,但它更像是EventMachine-y这样的方式.

希望这可以帮助.如果您还有疑问,请继续解释.

相关文章

validates:conclusion,:presence=>true,:inclusion=>{...
一、redis集群搭建redis3.0以前,提供了Sentinel工具来监控各...
分享一下我老师大神的人工智能教程。零基础!通俗易懂!风趣...
上一篇博文 ruby传参之引用类型 里边定义了一个方法名 mo...
一编程与编程语言 什么是编程语言? 能够被计算机所识别的表...
Ruby类和对象Ruby是一种完美的面向对象编程语言。面向对象编...