scala - 在scala中实现生产者消费者的正确方法是什么

标签 scala akka actor producer-consumer

我尝试在 scala 中实现一个生产者消费者程序,而不使用队列。因为我觉得Actor已经实现了“邮件队列”或者其他什么东西,所以再写一遍代码就显得多余了。

我尝试纯粹用Actor来编写程序。 下面是一个多生产者多消费者程序。 生产者为了模拟做某事而休眠一段时间。消费者根本不 sleep 。

但是,如果我不添加一个supervisor actor来监视消费者,以及一个使用“Await”的Promise对象(代码中的Supervisor类),我不知道如何关闭程序

有办法摆脱它们吗?

import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  def receive = {
    case _ =>
      while (true) {
        val sleepTime = scala.util.Random.nextInt(1000)
        Thread.sleep(sleepTime)
        println("Producer %s send food" format name)
        pool ! name
      }
  }
}

class Consumer(supervisor : ActorRef)(val name:String) extends Actor {

  var counter = 0

  def receive = {
    case s => 
      counter += 1
      println("%s eat food produced by %s" format (name,s))

      if (counter >= 10) {
        println("%s is full" format name)

        context.stop(self)
        supervisor ! 1
      }
  }
}

class Supervisor(p:Promise[String]) extends Actor {

  var r = 3

  def receive = {
    case _ =>
      r -= 1
      if (0 == r) {
        println("All consumer stopped")
        context.stop(self)
        p success ("Good")
      }
  }

}

object Try3 {

  def work(): Unit = {
    val system = ActorSystem("sys1")
    val nProducer = 5;
    val nConsumer = 3;
    val p = Promise[String]
    val supervisor = system.actorOf(Props(new Supervisor(p)));
    val arrConsumer = for ( i <-  1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) )
    val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) ))
    val arrProducer = for ( i <-  1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) )

    arrProducer foreach (_ ! "start")

    Await.result(p.future,Duration.Inf)
    println("great!")
    system.shutdown
  }

  def main(args:Array[String]): Unit = {
    work()
  }
}

Producer类的接收函数有一个问题,就是不会被关闭,因为它是while而不是break条件。

我能想到的唯一方法是“向生产者本身发送消息”。 我想知道这是实现这种请求的正常方法吗?

修改后的代码如下:

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  //  original implementation:
  //  def receive = {
  //    case _ =>
  //    while (true){
  //      val sleepTime = scala.util.Random.nextInt(1000)
  //      Thread.sleep(sleepTime)
  //      println("Producer %s send food" format name)
  //      pool ! name
  //    }
  //  }

  case object Loop;

  def receive = {
    case _ =>
      val sleepTime = scala.util.Random.nextInt(1000)
      Thread.sleep(sleepTime)
      println("Producer %s send food" format name)
      pool ! name
      self ! Loop   //send message to itself
  }
}

无论我的实现如何,在 scala 中使用 Actor 或 Future/Promise 实现生产者消费者程序的正确方法是什么?

最佳答案

您永远不应该在 Actor 内部阻塞(在您的情况下为 Thread.sleep,while 循环)。 Actor 内部的阻塞会从所有 Actor 之间使用的线程池中占用一个线程。即使像您这样少量的 Producer 也会使 ActorSystem 中的所有 Actor 失去线程并使它们无法使用。

而是使用 Scheduler 来安排在 Producer 中定期发送消息。

override def preStart(): Unit = {
  import scala.concurrent.duration._
  import context.dispatcher
  context.system.scheduler.schedule(
    initialDelay = 0.seconds,
    interval = 1.second,
    receiver = pool,
    message = name
  )
}

关于scala - 在scala中实现生产者消费者的正确方法是什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26293368/

相关文章:

scala - Akka 流。分组,聚合一段时间并发出结果

scala - 访问 trait 中的现有 ActorSystem

scala - 如何调试akka关联进程?

scala - 如何通过 sbt compile 抑制警告

scala - REST Lift 项目中的 SSL,从哪里开始?

Scala Akka Actors act() 函数

c# - 在单独的进程中部署 Akka.net 参与者

scala - Untyped vs TypedActors - 为什么要使用 untyped?

groovy - 发送给 gpars actor 的消息是否被复制或引用?

scala - 将导入标记为用于 IntelliJ