在scala中实现Producer Consumer的正确方法是什么

我尝试在 scala中实现Producer Consumer程序而不使用Queue.因为我认为Actor已经实现了“邮件队列”或其他东西,所以再次编写代码将是多余的.

我试图纯粹在Actor中编写程序.
以下是多生产者多个消费者计划.
制片人睡了一会儿,模拟做某事.消费者根本不睡觉.

但是我不知道如果我没有添加监督者演员来监视消费者,如何关闭程序,以及使用“Await”(代码中的监督者类)的Promise对象

反正有没有摆脱他们?

import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._

import scala.concurrent.{Await,Promise}
import scala.concurrent.duration._

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  def receive = {
    case _ =>
      while (true) {
        val sleepTime = scala.util.Random.nextInt(1000)
        Thread.sleep(sleepTime)
        println("Producer %s send food" format name)
        pool ! name
      }
  }
}

class Consumer(supervisor : ActorRef)(val name:String) extends Actor {

  var counter = 0

  def receive = {
    case s => 
      counter += 1
      println("%s eat food produced by %s" format (name,s))

      if (counter >= 10) {
        println("%s is full" format name)

        context.stop(self)
        supervisor ! 1
      }
  }
}

class Supervisor(p:Promise[String]) extends Actor {

  var r = 3

  def receive = {
    case _ =>
      r -= 1
      if (0 == r) {
        println("All consumer stopped")
        context.stop(self)
        p success ("Good")
      }
  }

}

object Try3 {

  def work(): Unit = {
    val system = ActorSystem("sys1")
    val nProducer = 5;
    val nConsumer = 3;
    val p = Promise[String]
    val supervisor = system.actorOf(Props(new Supervisor(p)));
    val arrConsumer = for ( i <-  1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) )
    val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) ))
    val arrProducer = for ( i <-  1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) )

    arrProducer foreach (_ ! "start")

    Await.result(p.future,Duration.Inf)
    println("great!")
    system.shutdown
  }

  def main(args:Array[String]): Unit = {
    work()
  }
}

接收函数Producer类有一个问题,它不会被关闭,因为它没有中断条件.

我能想到的唯一方法是“向生产者本身发送信息”.
我想知道这是实现这种请求的正常方式吗?

这是修改后的代码:

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  //  original implementation:
  //  def receive = {
  //    case _ =>
  //    while (true){
  //      val sleepTime = scala.util.Random.nextInt(1000)
  //      Thread.sleep(sleepTime)
  //      println("Producer %s send food" format name)
  //      pool ! name
  //    }
  //  }

  case object Loop;

  def receive = {
    case _ =>
      val sleepTime = scala.util.Random.nextInt(1000)
      Thread.sleep(sleepTime)
      println("Producer %s send food" format name)
      pool ! name
      self ! Loop   //send message to itself
  }
}

无论我的实现如何,使用Actor或Future / Promise在scala中实现Producer Consumer程序的正确方法是什么?

解决方法

你永远不应该在一个actor中阻塞(在你的情况下是Thread.sleep,while循环).阻止actor内部的一个线程从所有actor中使用的线程池中占用一个线程.即使像你这样的少量制作人也会使ActorSystem中的所有演员都被剥夺线程并使其无法使用.

而是使用Scheduler在Producer中定期安排meesage发送.

override def preStart(): Unit = {
  import scala.concurrent.duration._
  import context.dispatcher
  context.system.scheduler.schedule(
    initialDelay = 0.seconds,interval = 1.second,receiver = pool,message = name
  )
}

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...