scala - 从线程模型到参与者模型的转变

标签 scala concurrency refactoring actor

尝试掌握如何从参与者而不是线程的角度进行思考。我对以下用例有点困惑:

Consider a system that has a producer process that creates work (e.g. by reading data from a file), and a number of worker processes that consume the work (e.g. by parsing the data and writing it to a database). The rates at which work is produced and consumed can vary, and the system should remain robust to this. For example, if the workers can't keep up, the producer should detect this and eventually slow down or wait.

这很容易用线程实现:

val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
  new Thread() { 
    override def run() = {
      while (true) {
        try {
          // take next unit of work, waiting if necessary
          val work = queue.take()
          process(work)
        }
        catch {
          case e:InterruptedException => return
        }
      }
    }
  }
}

// start the workers
workers.foreach(_.start())

while (producer.hasNext) {
  val work = producer.next()
  // add new unit of work, waiting if necessary
  queue.put(work)
}

while (!queue.isEmpty) {
  // wait until queue is drained
  queue.wait()
}

// stop the workers
workers.foreach(_.interrupt())

这个模型并没有什么问题,而且我以前也成功地使用过它。这个示例可能过于冗长,因为使用 Executor 或 CompletionService 非常适合此任务。但我喜欢 Actor 抽象,并且认为在很多情况下它更容易推理。有没有办法使用参与者重写这个示例,特别是确保不存在缓冲区溢出(例如邮箱已满、消息丢失等)?

最佳答案

因为参与者“离线”处理消息(即消息的消费与消息的接收无关),所以很难看出如何精确模拟“生产者等待消费者 catch ”。

我唯一能想到的是消费者向生产者参与者请求工作(使用reply):

case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
  def act = {
    prod ! MoreWorkPlease
    loop {
      react {
        case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
      }
    }
  }
}

class Producer extends Actor {
  def act = loop {
    react {
      case MoreWorkPlease => reply(Work(getNextItem))
    }
  }
}

当然,这并不完美,因为生产者不会“向前读取”,并且仅在消费者准备好时才开始工作。用法如下:

val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()

关于scala - 从线程模型到参与者模型的转变,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4159340/

相关文章:

scala - Spark cassandra 连接器在独立 Spark 集群中不起作用

java - Play 2.4.6 框架中的表单未绑定(bind)

scala - 将现有变量标记为隐式方法的候选者

coding-style - 有没有办法清理这段 Go 代码?

json - 带有空字符串验证的 Play2.1 JSON 格式

java - 我应该在 ReferenceQueue 上同步吗?

.net - HttpClient 同时使用是否安全?

ios - iOS 10 核心数据保存错误 : Unresolved error Error Domain=NSCocoaErrorDomain Code=133020

javascript - 从对象生成的 AngularJS 选择框

visual-studio - 如何使用 ReSharper 重新格式化命名样式?