scala - Akka集群分片: Can Entry actors have dynamic props

标签 scala akka akka-cluster

Akka Cluster-Sharding 看起来与我必须跨 Akka 节点创建有状态持久 Actor 的单个实例的用例非常匹配。

我不清楚是否有可能有一个需要参数来构造它的 Entry actor 类型。或者也许我需要重新考虑 Entry actor 如何获取此信息。

Object Account {
  def apply(region: String, accountId: String): Props = Props(new Account(region, accountId))
}

class Account(val region: String, val accountId: String) extends Actor with PersistentActor { ... }

ClusterSharding.start 采用单个 Props 实例来创建所有 Entry Actor。

来自akka cluster-sharding :

val counterRegion: ActorRef = ClusterSharding(system).start(
  typeName = "Counter",
  entryProps = Some(Props[Counter]),
  idExtractor = idExtractor,
  shardResolver = shardResolver)

然后它根据您定义 idExtractor 的方式解析接收消息的 Entry actor。从 shard 的源代码中可以看出,它使用 id 作为给定 Entry actor 实例的名称:

def getEntry(id: EntryId): ActorRef = {
val name = URLEncoder.encode(id, "utf-8")
context.child(name).getOrElse {
  log.debug("Starting entry [{}] in shard [{}]", id, shardId)

  val a = context.watch(context.actorOf(entryProps, name))
  idByRef = idByRef.updated(a, id)
  refById = refById.updated(id, a)
  state = state.copy(state.entries + id)
  a
}

}

看来我应该让我的 Entry actor 通过给定的名称找出它的区域和 accountId,尽管现在这确实感觉有点老套,因为我将从字符串中解析它而不是直接获取值。这是我最好的选择吗?

最佳答案

我的情况和你很相似。我没有确切的答案,但我可以与您和读者分享我所做/尝试/思考的事情。

选项 1) 正如您所提到的,您可以从命名内容和解析路径的方式中提取 id、分片和区域信息。好处是 a) 这很容易做到。 缺点是 a) Akka 将 actor 路径编码为 UTF-8,因此,如果您使用非标准 url 字符(例如 || 或 w/e)作为分隔符,则需要首先从 utf8 对其进行解码。请注意,Akka utf8 内部是硬编码为编码方法,无法像在函数中那样提取编码格式,因此如果明天 akka 发生更改,您也必须调整代码。 b)你的系统不再保留同态(你的意思是“感觉有点hacky”)。这意味着您正在增加这样的风险:您的数据有一天可能会包含您的信息分隔符字符串作为有意义的数据,并且您的系统可能会陷入困惑。

选项 2) 如果你的 actor 不存在,分片将会生成它。因此,您可以强制代码始终向未初始化的参与者发送初始化消息,其中包含构造函数参数。你的分片 Actor 将拥有这样的东西:

val par1: Option[param1Type] = None

def receive = {
    case init(par1value) => par1 = Some(par1value)
    case query(par1) => sender ! par1
}

从您所在的区域访问参与者中,您始终可以先发送查询消息,然后如果返回为 None,则发送初始化消息。这假设您的区域访问 Actor 不维护已初始化 Actor 的列表,在这种情况下,您可以使用 init 生成,然后正常使用它们。 好处是 a) 很优雅 b)“感觉”正确

缺点:a)需要 2 条消息(如果您不维护初始化参与者的列表)

选项 3) 此选项已经过测试,但不起作用。我将其留在这里,以免人们浪费时间尝试同样的事情。 我不知道这是否有效,我还没有测试过,因为我在生产中使用这个场景,有特殊的限制,并且不允许使用奇特的东西 ^_^ 但请随意尝试,请通过 pm 或评论告诉我! 基本上,您可以从以下位置开始您的区域

val counterRegion: ActorRef = ClusterSharding(system).start(
  typeName = "Counter",
  entryProps = Some(Props[Counter]),
  idExtractor = idExtractor,
  shardResolver = shardResolver)

如果您在您的区域创建 Actor 中执行以下操作,该怎么办:

var providedPar1 = v1
def providePar1 = providedPar1

val counterRegion: ActorRef = ClusterSharding(system).start(
  typeName = "Counter",
  entryProps = Some(Props(classOf[Counter], providePar1),
  idExtractor = idExtractor,
  shardResolver = shardResolver)

然后你为每个创建更改providedPar1的值?这样做的缺点是,在它起作用的选项中,您需要避免更改providedPar1的值,直到您100%确定 Actor 已被创建,否则您可能会冒访问新的错误值的风险(是的) ,竞争条件!)

一般情况下,您最好选择选项 2,但在大多数情况下,选项 1 引入的风险很小,并且考虑到简单性(和性能)优势,您可以适本地减轻它们。

希望这篇咆哮能有所帮助,如果您尝试 3 种方法,请告诉我!

关于scala - Akka集群分片: Can Entry actors have dynamic props,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26475491/

相关文章:

performance - 以更实用的方式 (scalaz) 使用 Scala 会导致性能/可维护性损失吗?

scala - 无法连接到在 docker 中运行的 tcp 端口

java - 如何将基于 Actor 的源与 Akka Graph 结合使用?

scala - 使用 Play 框架设置 Akka 集群

scala - `<<=` 在 SBT 中是什么意思?

scala - 使用不同选项的 akka 流物化值

scala - 即使没有匹配项,如何始终在接收()中调用方法

akka - 在Akka.NET中,(使用Akka.Cluster)如何配置多个种子节点(Lighthouse)相互了解?

scala - 如何使用 Lagom Kafka Message Broker API 安全地跳过消息?

akka - 通过http端点关闭akka集群中无法访问的节点