scala - 使用 Akka Actors 处理多个 TCP 连接

标签 scala tcp akka actor

我正在尝试使用 akka 设置一个简单的 TCP 服务器应该允许多个客户端同时连接的参与者。我将我的问题简化为以下简单程序:

package actorfail
import akka.actor._, akka.io._, akka.util._
import scala.collection.mutable._
import java.net._

case class Foo()

class ConnHandler(conn: ActorRef) extends Actor {
  def receive = {
    case Foo() => conn ! Tcp.Write(ByteString("foo\n"))
  }
}

class Server(conns: ArrayBuffer[ActorRef]) extends Actor {
  import context.system
  println("Listing on 127.0.0.1:9191")
  IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("127.0.0.1", 9191))
  def receive = {
    case Tcp.Connected(remote, local) =>
      val handler = context.actorOf(Props(new ConnHandler(sender)))
      sender ! Tcp.Register(handler)
      conns.append(handler)
  }
}

object Main {
  def main(args: Array[String]) {
    implicit val system = ActorSystem("Test")
    val conns = new ArrayBuffer[ActorRef]()
    val server = system.actorOf(Props(new Server(conns)))
    while (true)  {
      println(s"Sending some foos")
      for (c <- conns) c ! Foo()
      Thread.sleep(1000)
    }
  }
}

它绑定(bind)到 localhost:9191 并接受多个连接,将连接处理程序添加到全局数组并定期向每个连接发送字符串 "foo"。现在,当我尝试同时连接多个客户端时,只有第一个获得“foo”。当我打开第二个连接时,它不会发送任何 foo,而是收到以下类型的日志消息:

Sending some foos
[INFO] [03/27/2015 21:24:07.331] [Test-akka.actor.default-dispatcher-6] [akka://Test/deadLetters] Message [akka.io.Tcp$Write] from Actor[akka://Test/user/$a/$b#-308726290] to Actor[akka://Test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

我知道这意味着我们尝试向其发送 Tcp.Write 命令的目标参与者不再接受消息。但这是为什么呢?你能帮我理解根本问题吗?我怎样才能使它工作?

最佳答案

上面的代码有两个问题:

  • 在 actor 消息中发送可变状态并以非线程安全的方式改变它
  • 在 Props 中包含不稳定的引用

在我详细说明之前,请考虑阅读文档,herehere ,这一切都涵盖在那里。

可变消息

ArrayBuffer 不是线程安全的,但是您将它从主例程传递给不同的参与者,然后他们独立(同时)修改它。这将导致更新丢失或数据结构本身损坏。另一方面,如果没有适当的同步,就不能保证主线程会看到修改,因为编译器原则上可以确定缓冲区在 while 循环中没有改变并优化代码相应地。

Actor 不依赖于共享的可变状态,而是只发送消息。在这种情况下,解决方案是将 while 循环提升到一个 actor 中(但在一秒钟后安排一条消息到 self 而不是阻塞 Thread.sleep(1000 ) 调用)。然后连接处理程序只需要为这个 foo 发送者 actor 传递 ActorRef,他们会向它发送一条消息来注册自己,然后那个 actor 保持事件列表它内部的连接封装了作用域。这样做的好处是您可以使用 DeathWatch 在连接终止时删除连接。

Props 中的不稳定引用

有问题的代码:Props(new ConnHandler(sender))

Props 是从一个 actor 工厂构造的,在这种情况下它被当作一个名字参数;整个 new 表达式将在以后计算,每当这样的 actor 被初始化时——可能在不同的线程上。这意味着 sender 稍后也会从该执行上下文中进行评估,因此它可能是 deadLetters(如果父 actor 当前未运行 - 如果它正在运行, sender 可能会完全指向错误的参与者)。

这里的解决方案记录在案here .

关于scala - 使用 Akka Actors 处理多个 TCP 连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29309657/

相关文章:

http - 将套接字与进程相关联

IOS - 如何在后台运行应用程序?

scala - 如何使用 akka 连接到 Unix 套接字?

Scala理解能力的表现

scala - Play 框架解析查询参数中的分号

C# TCP 隧道发送大量空值

scala - 不同的 Scala Actor 实现概述

scala - 在 Akka Streams 中将 Sink 转换为 Graph(SinkShape)?

sql - 带动态最后的 Spark 高级窗口

scala - 在scala中,如何将对象的值转换为Map[String, String]?