我尝试在
scala中实现Producer Consumer程序而不使用Queue.因为我认为Actor已经实现了“邮件队列”或其他东西,所以再次编写代码将是多余的.
我试图纯粹在Actor中编写程序.
以下是多生产者多个消费者计划.
制片人睡了一会儿,模拟做某事.消费者根本不睡觉.
但是我不知道如果我没有添加监督者演员来监视消费者,如何关闭程序,以及使用“Await”(代码中的监督者类)的Promise对象
反正有没有摆脱他们?
import akka.actor.Actor.Receive import akka.actor._ import akka.routing._; import akka.util._ import scala.concurrent.{Await,Promise} import scala.concurrent.duration._ class Producer(val pool:ActorRef)(val name:String) extends Actor { def receive = { case _ => while (true) { val sleepTime = scala.util.Random.nextInt(1000) Thread.sleep(sleepTime) println("Producer %s send food" format name) pool ! name } } } class Consumer(supervisor : ActorRef)(val name:String) extends Actor { var counter = 0 def receive = { case s => counter += 1 println("%s eat food produced by %s" format (name,s)) if (counter >= 10) { println("%s is full" format name) context.stop(self) supervisor ! 1 } } } class Supervisor(p:Promise[String]) extends Actor { var r = 3 def receive = { case _ => r -= 1 if (0 == r) { println("All consumer stopped") context.stop(self) p success ("Good") } } } object Try3 { def work(): Unit = { val system = ActorSystem("sys1") val nProducer = 5; val nConsumer = 3; val p = Promise[String] val supervisor = system.actorOf(Props(new Supervisor(p))); val arrConsumer = for ( i <- 1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) ) val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) )) val arrProducer = for ( i <- 1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) ) arrProducer foreach (_ ! "start") Await.result(p.future,Duration.Inf) println("great!") system.shutdown } def main(args:Array[String]): Unit = { work() } }
接收函数Producer类有一个问题,它不会被关闭,因为它没有中断条件.
我能想到的唯一方法是“向生产者本身发送信息”.
我想知道这是实现这种请求的正常方式吗?
这是修改后的代码:
class Producer(val pool:ActorRef)(val name:String) extends Actor { // original implementation: // def receive = { // case _ => // while (true){ // val sleepTime = scala.util.Random.nextInt(1000) // Thread.sleep(sleepTime) // println("Producer %s send food" format name) // pool ! name // } // } case object Loop; def receive = { case _ => val sleepTime = scala.util.Random.nextInt(1000) Thread.sleep(sleepTime) println("Producer %s send food" format name) pool ! name self ! Loop //send message to itself } }
无论我的实现如何,使用Actor或Future / Promise在scala中实现Producer Consumer程序的正确方法是什么?
解决方法
你永远不应该在一个actor中阻塞(在你的情况下是Thread.sleep,while循环).阻止actor内部的一个线程从所有actor中使用的线程池中占用一个线程.即使像你这样的少量制作人也会使ActorSystem中的所有演员都被剥夺线程并使其无法使用.
而是使用Scheduler在Producer中定期安排meesage发送.
override def preStart(): Unit = { import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule( initialDelay = 0.seconds,interval = 1.second,receiver = pool,message = name ) }