我正在尝试使用 akka 设置一个简单的 TCP 服务器应该允许多个客户端同时连接的参与者。我将我的问题简化为以下简单程序:
package actorfail
import akka.actor._, akka.io._, akka.util._
import scala.collection.mutable._
import java.net._
case class Foo()
class ConnHandler(conn: ActorRef) extends Actor {
def receive = {
case Foo() => conn ! Tcp.Write(ByteString("foo\n"))
}
}
class Server(conns: ArrayBuffer[ActorRef]) extends Actor {
import context.system
println("Listing on 127.0.0.1:9191")
IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("127.0.0.1", 9191))
def receive = {
case Tcp.Connected(remote, local) =>
val handler = context.actorOf(Props(new ConnHandler(sender)))
sender ! Tcp.Register(handler)
conns.append(handler)
}
}
object Main {
def main(args: Array[String]) {
implicit val system = ActorSystem("Test")
val conns = new ArrayBuffer[ActorRef]()
val server = system.actorOf(Props(new Server(conns)))
while (true) {
println(s"Sending some foos")
for (c <- conns) c ! Foo()
Thread.sleep(1000)
}
}
}
它绑定(bind)到 localhost:9191 并接受多个连接,将连接处理程序添加到全局数组并定期向每个连接发送字符串 "foo"
。现在,当我尝试同时连接多个客户端时,只有第一个获得“foo”。当我打开第二个连接时,它不会发送任何 foo,而是收到以下类型的日志消息:
Sending some foos
[INFO] [03/27/2015 21:24:07.331] [Test-akka.actor.default-dispatcher-6] [akka://Test/deadLetters] Message [akka.io.Tcp$Write] from Actor[akka://Test/user/$a/$b#-308726290] to Actor[akka://Test/deadLetters] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
我知道这意味着我们尝试向其发送 Tcp.Write
命令的目标参与者不再接受消息。但这是为什么呢?你能帮我理解根本问题吗?我怎样才能使它工作?
最佳答案
上面的代码有两个问题:
- 在 actor 消息中发送可变状态并以非线程安全的方式改变它
- 在 Props 中包含不稳定的引用
在我详细说明之前,请考虑阅读文档,here和 here ,这一切都涵盖在那里。
可变消息
ArrayBuffer 不是线程安全的,但是您将它从主例程传递给不同的参与者,然后他们独立(同时)修改它。这将导致更新丢失或数据结构本身损坏。另一方面,如果没有适当的同步,就不能保证主线程会看到修改,因为编译器原则上可以确定缓冲区在 while
循环中没有改变并优化代码相应地。
Actor 不依赖于共享的可变状态,而是只发送消息。在这种情况下,解决方案是将 while
循环提升到一个 actor 中(但在一秒钟后安排一条消息到 self
而不是阻塞 Thread.sleep(1000 )
调用)。然后连接处理程序只需要为这个 foo
发送者 actor 传递 ActorRef
,他们会向它发送一条消息来注册自己,然后那个 actor 保持事件列表它内部的连接封装了作用域。这样做的好处是您可以使用 DeathWatch 在连接终止时删除连接。
Props 中的不稳定引用
有问题的代码:Props(new ConnHandler(sender))
Props 是从一个 actor 工厂构造的,在这种情况下它被当作一个名字参数;整个 new
表达式将在以后计算,每当这样的 actor 被初始化时——可能在不同的线程上。这意味着 sender
稍后也会从该执行上下文中进行评估,因此它可能是 deadLetters
(如果父 actor 当前未运行 - 如果它正在运行, sender
可能会完全指向错误的参与者)。
这里的解决方案记录在案here .
关于scala - 使用 Akka Actors 处理多个 TCP 连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29309657/