scala - Akka DistributedPubSubMediator at-least-once delivery guarantees for publishing to a topic

标签 scala akka publish-subscribe

我需要为发布到 DistributedPubSubMediator 主题的消息提供至少一次传递保证。

我查看了 DistributedPubSubMediator.scala,可以在 TopicLike 特征(Akka 2.4.6)中看到以下内容:

trait TopicLike extends Actor {
  var subscribers = Set.empty[ActorRef]

  def defaultReceive: Receive = {
    case msg ?
      subscribers foreach { _ forward msg }
  }
}

但是我找不到任何方法来从中介检索订阅者集...如果有消息请求 GetTopicSubscribers 会将此信息公开给中介客户端,那就太好了:

mediator ! GetTopicSubscribers("mytopic")

因此在发布到主题后,发布者可以等待来自所有事件订阅者的 Ack 消息。还有其他方法可以完成类似的事情吗? 如果能以某种方式将 akka.contrib.pattern.ReliableProxy 插入到 DistributedPubSubMediator 中,那就太好了。

最佳答案

您可以让您的发布商通过 akka.cluster.pubsub.DistributedPubSubMediator.CountSubscribers 向调解员询问订阅者数量。 (“我的主题”)

然后它只需要记录从订阅者那里收到了多少 Ack 消息。

无需跟踪实际订阅者或哪些订阅者已确认,当您的 Ack 计数达到订阅者计数时,您知道他们都已收到(感谢 Akka at-most-once 交付可靠性)

但是请注意 source code 中的这条评论:

// Only for testing purposes, to poll/await replication
case object Count
final case class CountSubscribers(topic: String)

也许 CountSubscribers 不是太依赖的东西?

关于scala - Akka DistributedPubSubMediator at-least-once delivery guarantees for publishing to a topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37470738/

相关文章:

scala - 是否可以在 Actor 中使用 Akka 调度程序?

java - 停止(getSelf())与停止(this.getSelf())

javascript - jQuery 回调和发布/订阅

c# - 未触发 WCF 客户端关闭事件

scala - 使用 Scala 宏在树中查找所有可能的序列创建

scala - Future 何时可以返回不是从 Future 体内抛出的异常?

java - 装饰模式中为什么需要抽象装饰类?

scala - sbt/ivy 无法解析通配符 ivy 对文件系统解析器的依赖

scala - 使用 Akka Streams 动态压缩 List[Source[ByteString, NotUsed]]

rabbitmq - 对变量使用 RabbitMQ(或 pub/sub)