java - Java消息调度系统设计

标签 java multithreading performance akka

我正在为以下用例寻找轻量级且高效的解决方案:

  • 网关模块接收资源并交付给不同的接受者。
  • 为每个接受器排队的资源(按到达顺序)。
  • 清除进程会扫描这些队列,如果资源可供某个接受器使用,那么他会将它们捆绑在某个标签(唯一 ID)下,并发送新 bundle 可用的通知。

系统特点:

  • 接受者的数量是动态的。
  • 一个 bundle 中的资源数量没有限制。

Message Dispatching System

该模块将在 Java 7 下的 Tomcat 7 中使用(非集群)。

我考虑了以下解决方案:

  1. JMS - 每个接受器的动态队列配置,是否可以消耗队列中的所有可用消息?每个队列的线程配置(不可扩展)?
  2. AKKA Actor 。没有找到合适的使用模式。
  3. 朴素的纯 Java 实现,其中队列将由一个线程扫描(循环)。

我认为这是讨论此问题的可用解决方案的正确场所。 请在考虑以下几点时分享您的想法:

  • 合适的第三方框架。
  • 资源队列可扩展扫描。

提前致谢。

最佳答案

您可以使用各种技术,例如:

但出于高可用性可扩展性的原因,您应该使用Akka

Akka

实现的起点将是 Akka 中内置的一致性哈希路由算法 - 简而言之,这种类型的路由逻辑根据提供的 key 选择一致的路由。与您的问题描述相比的路线接受者

路由器参与者有两种不同的风格,这为您提供了在基础设施中部署新接受器的灵活机制。

  • 池 - 路由器将路由创建为子参与者,并在它们终止时将其从路由器中删除。

  • 组 - 路由参与者在路由器外部创建,路由器使用参与者选择将消息发送到指定路径,而不监视终止。

首先请阅读Akka路由文档,以更好地了解Akka框架中的路由实现:

您还可以查看这篇有关可扩展和高可用性系统设计的文章:

Q1 Actor 是否有可能知道他的路线(他的哈希 key )?

Actor 可能知道当前处理哪个键,因为它可能只是消息的一部分 - 但您不应该基于此键构建跨消息逻辑/状态。

消息:

import akka.routing.ConsistentHashingRouter.ConsistentHashable
  class Message(key : String) extends ConsistentHashable with Serializable {
      override def consistentHashKey(): AnyRef = key
  }

Actor :

  import akka.actor.{Actor, ActorLogging}

  class EchoActor extends Actor with ActorLogging {

    log.info("Actor created {}", self.path.name)

    def receive = {
      case message: Message =>
        log.info("Received message {} in actor {}", message.consistentHashKey(),             self.path.name)
      case _ => log.error("Received unsupported message");
    }
  }

Q2 Actor 可以管理邮箱以外的状态吗?

Actor 状态只能通过它们之间发送的消息进行更改。

如果您将初始化包含对经典 java/spring/.. bean 的引用的 actor,它将能够与非 actor 世界/状态进行交互,例如。 dao 层,但这种类型的集成应该尽可能受到限制,并被视为反模式。

Q3 有没有办法使用防碰撞的配置?

作为 API 使用者,您需要自己定义防碰撞模型,但 Akka 再次提供了执行此操作所需的基础设施。

  1. 在大多数情况下, key 将是域的一部分,例如。拍卖id、客户id

  2. 如果需要按需生成 key ,您可以使用 ClusterSingletonPersistence扩展名。

Generator可以是负责生成唯一ID的Actor,其他actor可以使用ask模式获取新的id。

ClusterSingleton使用ClusterSingletonManager初始化并使用ClusterSingletonProxy获取

system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Generator]),
singletonName = "gnerator",
terminationMessage = End,
role = Some("generator")),
name = "singleton")


system.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/singleton/generator",
role = Some("generator")),
name = "generatorProxy")

关于java - Java消息调度系统设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27101934/

相关文章:

database - 用户定义的数据对象——什么是最好的数据存储策略?

java - 将 JSON 文件解析为 Java 对象

java - 为什么paintComponent无法正确显示?

c - 如何在 Windows 中结束线程

java - 神经网络的 C++/Java 性能?

javascript - 使用 GoJS 时提高性能

Java 和使用方法 addAll for List<Character>

java - 在 EditText 中按下 Enter 时停止关闭键盘?

c - 在 C 中使用 API 线程处理文件

c++ - 如何在C++中实现一个简单的多线程FileLogger?