scala – 使用Akka Actors的OutOfMemoryError

我有一个使用RabbitMQ消息的应用程序,我正在使用Actors来处理工作.

这是我的方法

object QueueConsumer extends Queue {

  def consumeMessages = {
    setupListener(buildChannel(resultsQueueName),resultsQueueName,resultsCallback)
  }

  private def setupListener(receivingChannel: Channel,queue: String,f: (String) => Any) {
    Akka.system.scheduler.scheduleOnce(Duration(10,TimeUnit.SECONDS),Akka.system.actorOf(Props(new QueueActor(receivingChannel,queue,f))),"")
  }

}

class QueueActor(channel:Channel,queue:String,f:(String) => Any) extends Actor{

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new queueingConsumer(channel)
    channel.basicConsume(queue,false,consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Props(new Actor {
    def receive = {
      case some: String => f(some)
    }
      })) ! msg
      channel.basicAck(delivery.getEnvelope.getDeliveryTag,false)
    }
  }

}

运行几秒钟后,它会抛出java.lang.OutOfMemoryError:超出GC开销限制.

我认为它正在发生,因为我正在为我收到的每条消息开始一个新的Actor – 所以如果我有100000条消息,它将创建10万个演员.这是一个方法还是我应该实现像’演员池’这样的东西?

任何人都知道如何在我的场景中避免OutOfMemoryError?

预先感谢.

EDIT1:

改变方法

class Queue2(json:String) extends Actor {

  def receive = {
    case x: String =>
      val envelope = MessageEnvelopeParser.toObject(x)
      val processor = ProcessQueueServiceFactory.getProcessResultsService()
      envelope.messages.foreach(message => processor.process(message))
  }

}

object Queue2 {
  def props(json: String): Props = Props(new Queue2(json))
}

class QueueActor(channel:Channel,queue:String) extends Actor {

  def receive = {
    case _ => startReceiving
  }

  def startReceiving = {
    val consumer = new queueingConsumer(channel)
    channel.basicConsume(queue,consumer)
    while (true) {
      val delivery = consumer.nextDelivery()
      val msg = new String(delivery.getBody())
      context.actorOf(Queue2.props(msg))
      channel.basicAck(delivery.getEnvelope.getDeliveryTag,false)
    }
  }
}

解决方法

你的每个消息的演员在完成后需要自己停止,否则他们会永远呆在那里.请参阅 Actor lifecyclestopping Actors上的文档.在这里,您只需要在处理完成后添加context.stop(self).

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...